You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:34 UTC
[32/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job
module into 'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
deleted file mode 100644
index 4862bb1..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dao;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.metadata.MetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- */
-public class ExecutableDao {
-
- private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class);
- private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
- private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
- private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
- public static final String JOB_PATH_ROOT = "/execute";
- public static final String JOB_OUTPUT_ROOT = "/execute_output";
-
- private ResourceStore store;
-
- public static ExecutableDao getInstance(KylinConfig config) {
- ExecutableDao r = CACHE.get(config);
- if (r == null) {
- r = new ExecutableDao(config);
- CACHE.put(config, r);
- if (CACHE.size() > 1) {
- logger.warn("More than one singleton exist");
- }
-
- }
- return r;
- }
-
- private ExecutableDao(KylinConfig config) {
- logger.info("Using metadata url: " + config);
- this.store = MetadataManager.getInstance(config).getStore();
- }
-
- private String pathOfJob(ExecutablePO job) {
- return pathOfJob(job.getUuid());
- }
- private String pathOfJob(String uuid) {
- return JOB_PATH_ROOT + "/" + uuid;
- }
-
- private String pathOfJobOutput(String uuid) {
- return JOB_OUTPUT_ROOT + "/" + uuid;
- }
-
- private ExecutablePO readJobResource(String path) throws IOException {
- return store.getResource(path, ExecutablePO.class, JOB_SERIALIZER);
- }
-
- private void writeJobResource(String path, ExecutablePO job) throws IOException {
- store.putResource(path, job, JOB_SERIALIZER);
- }
-
- private ExecutableOutputPO readJobOutputResource(String path) throws IOException {
- return store.getResource(path, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
- }
-
- private long writeJobOutputResource(String path, ExecutableOutputPO output) throws IOException {
- return store.putResource(path, output, JOB_OUTPUT_SERIALIZER);
- }
-
- public List<ExecutableOutputPO> getJobOutputs() throws PersistentException {
- try {
- ArrayList<String> resources = store.listResources(JOB_OUTPUT_ROOT);
- if (resources == null || resources.isEmpty()) {
- return Collections.emptyList();
- }
- Collections.sort(resources);
- String rangeStart = resources.get(0);
- String rangeEnd = resources.get(resources.size() - 1);
- return store.getAllResources(rangeStart, rangeEnd, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
- } catch (IOException e) {
- logger.error("error get all Jobs:", e);
- throw new PersistentException(e);
- }
- }
-
- public List<ExecutablePO> getJobs() throws PersistentException {
- try {
- final List<String> jobIds = store.listResources(JOB_PATH_ROOT);
- if (jobIds == null || jobIds.isEmpty()) {
- return Collections.emptyList();
- }
- Collections.sort(jobIds);
- String rangeStart = jobIds.get(0);
- String rangeEnd = jobIds.get(jobIds.size() - 1);
- return store.getAllResources(rangeStart, rangeEnd, ExecutablePO.class, JOB_SERIALIZER);
- } catch (IOException e) {
- logger.error("error get all Jobs:", e);
- throw new PersistentException(e);
- }
- }
-
- public List<String> getJobIds() throws PersistentException {
- try {
- ArrayList<String> resources = store.listResources(JOB_PATH_ROOT);
- if (resources == null) {
- return Collections.emptyList();
- }
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(resources.size());
- for (String path : resources) {
- result.add(path.substring(path.lastIndexOf("/") + 1));
- }
- return result;
- } catch (IOException e) {
- logger.error("error get all Jobs:", e);
- throw new PersistentException(e);
- }
- }
-
- public ExecutablePO getJob(String uuid) throws PersistentException {
- try {
- return readJobResource(pathOfJob(uuid));
- } catch (IOException e) {
- logger.error("error get job:" + uuid, e);
- throw new PersistentException(e);
- }
- }
-
- public ExecutablePO addJob(ExecutablePO job) throws PersistentException {
- try {
- if (getJob(job.getUuid()) != null) {
- throw new IllegalArgumentException("job id:" + job.getUuid() + " already exists");
- }
- writeJobResource(pathOfJob(job), job);
- return job;
- } catch (IOException e) {
- logger.error("error save job:" + job.getUuid(), e);
- throw new PersistentException(e);
- }
- }
-
- public void deleteJob(String uuid) throws PersistentException {
- try {
- store.deleteResource(pathOfJob(uuid));
- } catch (IOException e) {
- logger.error("error delete job:" + uuid, e);
- throw new PersistentException(e);
- }
- }
-
- public ExecutableOutputPO getJobOutput(String uuid) throws PersistentException {
- try {
- ExecutableOutputPO result = readJobOutputResource(pathOfJobOutput(uuid));
- if (result == null) {
- result = new ExecutableOutputPO();
- result.setUuid(uuid);
- return result;
- }
- return result;
- } catch (IOException e) {
- logger.error("error get job output id:" + uuid, e);
- throw new PersistentException(e);
- }
- }
-
- public void addJobOutput(ExecutableOutputPO output) throws PersistentException {
- try {
- output.setLastModified(0);
- writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
- } catch (IOException e) {
- logger.error("error update job output id:" + output.getUuid(), e);
- throw new PersistentException(e);
- }
- }
-
- public void updateJobOutput(ExecutableOutputPO output) throws PersistentException {
- try {
- final long ts = writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
- output.setLastModified(ts);
- } catch (IOException e) {
- logger.error("error update job output id:" + output.getUuid(), e);
- throw new PersistentException(e);
- }
- }
-
- public void deleteJobOutput(String uuid) throws PersistentException {
- try {
- store.deleteResource(pathOfJobOutput(uuid));
- } catch (IOException e) {
- logger.error("error delete job:" + uuid, e);
- throw new PersistentException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
deleted file mode 100644
index 4dacd8a..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dao;
-
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ExecutableOutputPO extends RootPersistentEntity {
-
- @JsonProperty("content")
- private String content;
-
- @JsonProperty("status")
- private String status = "READY";
-
- @JsonProperty("info")
- private Map<String, String> info = Maps.newHashMap();
-
- public String getContent() {
- return content;
- }
-
- public void setContent(String content) {
- this.content = content;
- }
-
- public String getStatus() {
- return status;
- }
-
- public void setStatus(String status) {
- this.status = status;
- }
-
- public Map<String, String> getInfo() {
- return info;
- }
-
- public void setInfo(Map<String, String> info) {
- this.info = info;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
deleted file mode 100644
index 6a17b29..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dao;
-
-import java.util.List;
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ExecutablePO extends RootPersistentEntity {
-
- @JsonProperty("name")
- private String name;
-
- @JsonProperty("tasks")
- private List<ExecutablePO> tasks;
-
- @JsonProperty("type")
- private String type;
-
- @JsonProperty("params")
- private Map<String, String> params = Maps.newHashMap();
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public List<ExecutablePO> getTasks() {
- return tasks;
- }
-
- public void setTasks(List<ExecutablePO> tasks) {
- this.tasks = tasks;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public Map<String, String> getParams() {
- return params;
- }
-
- public void setParams(Map<String, String> params) {
- this.params = params;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java b/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
index 1aeb50f..b03cb5f 100644
--- a/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
@@ -27,8 +27,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-
-import org.apache.kylin.job.tools.LZOSupportnessChecker;
+import org.apache.kylin.storage.hbase.util.LZOSupportnessChecker;
/**
* <p/>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
deleted file mode 100644
index 2eb9b31..0000000
--- a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.engine;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-
-/**
- * @author ysong1
- */
-public class JobEngineConfig {
- private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class);
- public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
- public static String HIVE_CONF_FILENAME = "kylin_hive_conf";
-
- private static File getJobConfig(String fileName) {
- String path = System.getProperty(KylinConfig.KYLIN_CONF);
- if (StringUtils.isNotEmpty(path)) {
- return new File(path, fileName);
- }
-
- path = KylinConfig.getKylinHome();
- if (StringUtils.isNotEmpty(path)) {
- return new File(path + File.separator + "conf", fileName);
- }
- return null;
- }
-
- private String getHadoopJobConfFilePath(RealizationCapacity capaticy, boolean appendSuffix) throws IOException {
- String hadoopJobConfFile;
- if (appendSuffix) {
- hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml");
- } else {
- hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
- }
-
- File jobConfig = getJobConfig(hadoopJobConfFile);
- if (jobConfig == null || !jobConfig.exists()) {
- logger.warn("fail to locate " + hadoopJobConfFile + ", trying to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
- jobConfig = getJobConfig(HADOOP_JOB_CONF_FILENAME + ".xml");
- if (jobConfig == null || !jobConfig.exists()) {
- logger.error("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
- throw new RuntimeException("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
- }
- }
- return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
- }
-
- public String getHadoopJobConfFilePath(RealizationCapacity capaticy) throws IOException {
- String path = getHadoopJobConfFilePath(capaticy, true);
- if (!StringUtils.isEmpty(path)) {
- logger.info("Chosen job conf is : " + path);
- return path;
- } else {
- path = getHadoopJobConfFilePath(capaticy, false);
- if (!StringUtils.isEmpty(path)) {
- logger.info("Chosen job conf is : " + path);
- return path;
- }
- }
- return "";
- }
-
-
- public String getHiveConfFilePath() throws IOException {
- String hiveConfFile = (HIVE_CONF_FILENAME + ".xml");
-
- File jobConfig = getJobConfig(hiveConfFile);
- if (jobConfig == null || !jobConfig.exists()) {
-
- logger.error("fail to locate " + HIVE_CONF_FILENAME + ".xml");
- throw new RuntimeException("fail to locate " + HIVE_CONF_FILENAME + ".xml");
- }
- return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
- }
-
- // there should be no setters
- private final KylinConfig config;
-
- public JobEngineConfig(KylinConfig config) {
- this.config = config;
- }
-
- public KylinConfig getConfig() {
- return config;
- }
-
- public String getHdfsWorkingDirectory() {
- return config.getHdfsWorkingDirectory();
- }
-
- /**
- * @return the maxConcurrentJobLimit
- */
- public int getMaxConcurrentJobLimit() {
- return config.getMaxConcurrentJobLimit();
- }
-
- /**
- * @return the timeZone
- */
- public String getTimeZone() {
- return config.getTimeZone();
- }
-
- /**
- * @return the adminDls
- */
- public String getAdminDls() {
- return config.getAdminDls();
- }
-
- /**
- * @return the jobStepTimeout
- */
- public long getJobStepTimeout() {
- return config.getJobStepTimeout();
- }
-
- /**
- * @return the asyncJobCheckInterval
- */
- public int getAsyncJobCheckInterval() {
- return config.getYarnStatusCheckIntervalSeconds();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#hashCode()
- */
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((config == null) ? 0 : config.hashCode());
- return result;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- JobEngineConfig other = (JobEngineConfig) obj;
- if (config == null) {
- if (other.config != null)
- return false;
- } else if (!config.equals(other.config))
- return false;
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java b/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
deleted file mode 100644
index 8544fff..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class ExecuteException extends Exception {
-
- private static final long serialVersionUID = 5677121412192984281L;
-
- public ExecuteException() {
- }
-
- public ExecuteException(String message) {
- super(message);
- }
-
- public ExecuteException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ExecuteException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java b/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
deleted file mode 100644
index f19b0ca..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class IllegalStateTranferException extends RuntimeException {
-
- private static final long serialVersionUID = 8466551519300132702L;
-
- public IllegalStateTranferException() {
- }
-
- public IllegalStateTranferException(String message) {
- super(message);
- }
-
- public IllegalStateTranferException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public IllegalStateTranferException(Throwable cause) {
- super(cause);
- }
-
- public IllegalStateTranferException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/exception/JobException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/JobException.java b/job/src/main/java/org/apache/kylin/job/exception/JobException.java
deleted file mode 100644
index ba4c52a..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/JobException.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- * @author xduo
- *
- */
-public class JobException extends Exception {
-
- private static final long serialVersionUID = 1L;
-
- /**
- *
- */
- public JobException() {
- super();
- }
-
- /**
- * @param message
- * @param cause
- */
- public JobException(String message, Throwable cause) {
- super(message, cause);
- }
-
- /**
- * @param message
- */
- public JobException(String message) {
- super(message);
- }
-
- /**
- * @param cause
- */
- public JobException(Throwable cause) {
- super(cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/exception/LockException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/LockException.java b/job/src/main/java/org/apache/kylin/job/exception/LockException.java
deleted file mode 100644
index cf43ac9..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/LockException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class LockException extends Exception {
- private static final long serialVersionUID = 2072745879281754945L;
-
- public LockException() {
- }
-
- public LockException(String message) {
- super(message);
- }
-
- public LockException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public LockException(Throwable cause) {
- super(cause);
- }
-
- public LockException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java b/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
deleted file mode 100644
index 8507a53..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class PersistentException extends Exception {
- private static final long serialVersionUID = -4239863858506718998L;
-
- public PersistentException() {
- }
-
- public PersistentException(String message) {
- super(message);
- }
-
- public PersistentException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public PersistentException(Throwable cause) {
- super(cause);
- }
-
- public PersistentException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java b/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
deleted file mode 100644
index 057bd4a..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class SchedulerException extends Exception {
- private static final long serialVersionUID = 349041244824274861L;
-
- public SchedulerException() {
- }
-
- public SchedulerException(String message) {
- super(message);
- }
-
- public SchedulerException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public SchedulerException(Throwable cause) {
- super(cause);
- }
-
- public SchedulerException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
deleted file mode 100644
index be82b3a..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LogTitlePrinter;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.impl.threadpool.DefaultContext;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- */
-public abstract class AbstractExecutable implements Executable, Idempotent {
-
- protected static final String SUBMITTER = "submitter";
- protected static final String NOTIFY_LIST = "notify_list";
- protected static final String START_TIME = "startTime";
- protected static final String END_TIME = "endTime";
-
- protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
-
- private String name;
- private String id;
- private Map<String, String> params = Maps.newHashMap();
-
- protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- public AbstractExecutable() {
- setId(UUID.randomUUID().toString());
- }
-
- protected void onExecuteStart(ExecutableContext executableContext) {
- Map<String, String> info = Maps.newHashMap();
- info.put(START_TIME, Long.toString(System.currentTimeMillis()));
- executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
- }
-
- protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
- setEndTime(System.currentTimeMillis());
- if (!isDiscarded()) {
- if (result.succeed()) {
- executableManager.updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output());
- } else {
- executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
- }
- } else {
- }
- }
-
- protected void onExecuteError(Throwable exception, ExecutableContext executableContext) {
- if (!isDiscarded()) {
- executableManager.addJobInfo(getId(), END_TIME, Long.toString(System.currentTimeMillis()));
- String output = null;
- if (exception != null) {
- final StringWriter out = new StringWriter();
- exception.printStackTrace(new PrintWriter(out));
- output = out.toString();
- }
- executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, output);
- } else {
- }
- }
-
- @Override
- public final ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException {
-
- //print a eye-catching title in log
- LogTitlePrinter.printTitle(this.getName());
-
- Preconditions.checkArgument(executableContext instanceof DefaultContext);
- ExecuteResult result;
- try {
- onExecuteStart(executableContext);
- result = doWork(executableContext);
- } catch (Throwable e) {
- logger.error("error running Executable", e);
- onExecuteError(e, executableContext);
- throw new ExecuteException(e);
- }
- onExecuteFinished(result, executableContext);
- return result;
- }
-
- protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException;
-
- @Override
- public void cleanup() throws ExecuteException {
-
- }
-
- @Override
- public boolean isRunnable() {
- return this.getStatus() == ExecutableState.READY;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- @Override
- public final String getId() {
- return this.id;
- }
-
- public final void setId(String id) {
- this.id = id;
- }
-
- @Override
- public final ExecutableState getStatus() {
- return executableManager.getOutput(this.getId()).getState();
- }
-
- @Override
- public final Map<String, String> getParams() {
- return Collections.unmodifiableMap(this.params);
- }
-
- public final String getParam(String key) {
- return this.params.get(key);
- }
-
- public final void setParam(String key, String value) {
- this.params.put(key, value);
- }
-
- public final void setParams(Map<String, String> params) {
- this.params.putAll(params);
- }
-
- public final long getLastModified() {
- return executableManager.getOutput(getId()).getLastModified();
- }
-
- public final void setSubmitter(String submitter) {
- setParam(SUBMITTER, submitter);
- }
-
- public final List<String> getNotifyList() {
- final String str = getParam(NOTIFY_LIST);
- if (str != null) {
- return Lists.newArrayList(StringUtils.split(str, ","));
- } else {
- return Collections.emptyList();
- }
- }
-
- public final void setNotifyList(String notifications) {
- setParam(NOTIFY_LIST, notifications);
- }
-
- public final void setNotifyList(List<String> notifications) {
- setNotifyList(StringUtils.join(notifications, ","));
- }
-
- protected Pair<String, String> formatNotifications(ExecutableContext executableContext, ExecutableState state) {
- return null;
- }
-
- protected final void notifyUserStatusChange(ExecutableContext context, ExecutableState state) {
- try {
- List<String> users = Lists.newArrayList();
- users.addAll(getNotifyList());
- final String adminDls = KylinConfig.getInstanceFromEnv().getAdminDls();
- if (null != adminDls) {
- for (String adminDl : adminDls.split(",")) {
- users.add(adminDl);
- }
- }
- if (users.isEmpty()) {
- return;
- }
- final Pair<String, String> email = formatNotifications(context, state);
- if (email == null) {
- return;
- }
- logger.info("prepare to send email to:" + users);
- logger.info("job name:" + getName());
- logger.info("submitter:" + getSubmitter());
- logger.info("notify list:" + users);
- new MailService().sendMail(users, email.getLeft(), email.getRight());
- } catch (Exception e) {
- logger.error(e.getLocalizedMessage(), e);
- }
- }
-
- public final String getSubmitter() {
- return getParam(SUBMITTER);
- }
-
- @Override
- public final Output getOutput() {
- return executableManager.getOutput(getId());
- }
-
- protected long getExtraInfoAsLong(String key, long defaultValue) {
- return getExtraInfoAsLong(executableManager.getOutput(getId()), key, defaultValue);
- }
-
- public static long getStartTime(Output output) {
- return getExtraInfoAsLong(output, START_TIME, 0L);
- }
-
- public static long getEndTime(Output output) {
- return getExtraInfoAsLong(output, END_TIME, 0L);
- }
-
- public static long getDuration(long startTime, long endTime) {
- if (startTime == 0) {
- return 0;
- }
- if (endTime == 0) {
- return System.currentTimeMillis() - startTime;
- } else {
- return endTime - startTime;
- }
- }
-
- public static long getExtraInfoAsLong(Output output, String key, long defaultValue) {
- final String str = output.getExtra().get(key);
- if (str != null) {
- return Long.parseLong(str);
- } else {
- return defaultValue;
- }
- }
-
- protected final void addExtraInfo(String key, String value) {
- executableManager.addJobInfo(getId(), key, value);
- }
-
- public final void setStartTime(long time) {
- addExtraInfo(START_TIME, time + "");
- }
-
- public final void setEndTime(long time) {
- addExtraInfo(END_TIME, time + "");
- }
-
- public final long getStartTime() {
- return getExtraInfoAsLong(START_TIME, 0L);
- }
-
- public final long getEndTime() {
- return getExtraInfoAsLong(END_TIME, 0L);
- }
-
- public final long getDuration() {
- return getDuration(getStartTime(), getEndTime());
- }
-
- /*
- * discarded is triggered by JobService, the Scheduler is not awake of that
- *
- * */
- protected final boolean isDiscarded() {
- final ExecutableState status = executableManager.getOutput(getId()).getState();
- return status == ExecutableState.DISCARDED;
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this).add("id", getId()).add("name", getName()).add("state", getStatus()).toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
deleted file mode 100644
index d5a7aae..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import java.util.List;
-
-/**
- */
-public interface ChainedExecutable extends Executable {
-
- List<? extends AbstractExecutable> getTasks();
-
- void addTask(AbstractExecutable executable);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
deleted file mode 100644
index 6443762..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.manager.ExecutableManager;
-
-/**
- */
-public class DefaultChainedExecutable extends AbstractExecutable implements ChainedExecutable {
-
- private final List<AbstractExecutable> subTasks = Lists.newArrayList();
-
- protected final ExecutableManager jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- public DefaultChainedExecutable(){
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- List<? extends Executable> executables = getTasks();
- final int size = executables.size();
- for (int i = 0; i < size; ++i) {
- Executable subTask = executables.get(i);
- if (subTask.isRunnable()) {
- return subTask.execute(context);
- }
- }
- return new ExecuteResult(ExecuteResult.State.SUCCEED, null);
- }
-
- @Override
- protected void onExecuteStart(ExecutableContext executableContext) {
- Map<String, String> info = Maps.newHashMap();
- info.put(START_TIME, Long.toString(System.currentTimeMillis()));
- final long startTime = getStartTime();
- if (startTime > 0) {
- jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
- } else {
- jobService.updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
- }
- }
-
- @Override
- protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
- if (isDiscarded()) {
- setEndTime(System.currentTimeMillis());
- notifyUserStatusChange(executableContext, ExecutableState.DISCARDED);
- } else if (result.succeed()) {
- List<? extends Executable> jobs = getTasks();
- boolean allSucceed = true;
- boolean hasError = false;
- for (Executable task: jobs) {
- final ExecutableState status = task.getStatus();
- if (status == ExecutableState.ERROR) {
- hasError = true;
- }
- if (status != ExecutableState.SUCCEED) {
- allSucceed = false;
- }
- }
- if (allSucceed) {
- setEndTime(System.currentTimeMillis());
- jobService.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null);
- notifyUserStatusChange(executableContext, ExecutableState.SUCCEED);
- } else if (hasError) {
- setEndTime(System.currentTimeMillis());
- jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
- notifyUserStatusChange(executableContext, ExecutableState.ERROR);
- } else {
- jobService.updateJobOutput(getId(), ExecutableState.READY, null, null);
- }
- } else {
- setEndTime(System.currentTimeMillis());
- jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
- notifyUserStatusChange(executableContext, ExecutableState.ERROR);
- }
- }
-
- @Override
- public List<AbstractExecutable> getTasks() {
- return subTasks;
- }
-
- public final AbstractExecutable getTaskByName(String name) {
- for (AbstractExecutable task : subTasks) {
- if (task.getName() != null && task.getName().equalsIgnoreCase(name)) {
- return task;
- }
- }
- return null;
- }
-
- @Override
- public void addTask(AbstractExecutable executable) {
- executable.setId(getId() + "-" + String.format("%02d", subTasks.size()));
- this.subTasks.add(executable);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java b/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java
deleted file mode 100644
index 6bc3281..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Map;
-
-/**
- */
-public class DefaultOutput implements Output {
-
- private ExecutableState state;
- private Map<String, String> extra;
- private String verboseMsg;
- private long lastModified;
-
- @Override
- public Map<String, String> getExtra() {
- return extra;
- }
-
- @Override
- public String getVerboseMsg() {
- return verboseMsg;
- }
-
- @Override
- public ExecutableState getState() {
- return state;
- }
-
- @Override
- public long getLastModified() {
- return lastModified;
- }
-
- public void setState(ExecutableState state) {
- this.state = state;
- }
-
- public void setExtra(Map<String, String> extra) {
- this.extra = extra;
- }
-
- public void setVerboseMsg(String verboseMsg) {
- this.verboseMsg = verboseMsg;
- }
-
- public void setLastModified(long lastModified) {
- this.lastModified = lastModified;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int hashCode = state.hashCode();
- hashCode = hashCode * prime + extra.hashCode();
- hashCode = hashCode * prime + verboseMsg.hashCode();
- hashCode = hashCode * prime + Long.valueOf(lastModified).hashCode();
- return hashCode;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof DefaultOutput)) {
- return false;
- }
- DefaultOutput another = ((DefaultOutput) obj);
- if (this.state != another.state) {
- return false;
- }
- if (!extra.equals(another.extra)) {
- return false;
- }
- if (this.lastModified != another.lastModified) {
- return false;
- }
- return StringUtils.equals(verboseMsg, another.verboseMsg);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/Executable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/Executable.java b/job/src/main/java/org/apache/kylin/job/execution/Executable.java
deleted file mode 100644
index a7f1358..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/Executable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import org.apache.kylin.job.exception.ExecuteException;
-
-import java.util.Map;
-
-/**
- */
-public interface Executable {
-
- String getId();
-
- String getName();
-
- ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException;
-
- ExecutableState getStatus();
-
- Output getOutput();
-
- boolean isRunnable();
-
- Map<String, String> getParams();
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java b/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
deleted file mode 100644
index e3f99ca..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import org.apache.kylin.common.KylinConfig;
-
-/**
- */
-public interface ExecutableContext {
-
- Object getSchedulerContext();
-
- KylinConfig getConfig();
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java b/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
deleted file mode 100644
index 5dad4b3..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-
-/**
- */
-public enum ExecutableState {
-
- READY,
- RUNNING,
- ERROR,
- STOPPED,
- DISCARDED,
- SUCCEED;
-
- private static Multimap<ExecutableState, ExecutableState> VALID_STATE_TRANSFER;
-
- static {
- VALID_STATE_TRANSFER = Multimaps.newSetMultimap(Maps.<ExecutableState, Collection<ExecutableState>>newEnumMap(ExecutableState.class), new Supplier<Set<ExecutableState>>() {
- @Override
- public Set<ExecutableState> get() {
- return new CopyOnWriteArraySet<ExecutableState>();
- }
- });
-
- //scheduler
- VALID_STATE_TRANSFER.put(ExecutableState.READY, ExecutableState.RUNNING);
- VALID_STATE_TRANSFER.put(ExecutableState.READY, ExecutableState.ERROR);
- //user
- VALID_STATE_TRANSFER.put(ExecutableState.READY, ExecutableState.DISCARDED);
-
- //job
- VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.READY);
- //job
- VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.SUCCEED);
- //user
- VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.DISCARDED);
- //scheduler,job
- VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.ERROR);
-
-
- VALID_STATE_TRANSFER.put(ExecutableState.STOPPED, ExecutableState.DISCARDED);
- VALID_STATE_TRANSFER.put(ExecutableState.STOPPED, ExecutableState.READY);
-
- VALID_STATE_TRANSFER.put(ExecutableState.ERROR, ExecutableState.DISCARDED);
- VALID_STATE_TRANSFER.put(ExecutableState.ERROR, ExecutableState.READY);
- }
-
- public boolean isFinalState() {
- return this == SUCCEED || this == DISCARDED;
- }
-
- public static boolean isValidStateTransfer(ExecutableState from, ExecutableState to) {
- return VALID_STATE_TRANSFER.containsEntry(from, to);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java b/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
deleted file mode 100644
index cddc0f7..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public final class ExecuteResult {
-
- public static enum State {SUCCEED, FAILED, ERROR, DISCARDED, STOPPED}
-
- private final State state;
- private final String output;
-
- public ExecuteResult(State state) {
- this(state, "");
- }
-
- public ExecuteResult(State state, String output) {
- Preconditions.checkArgument(state != null, "state cannot be null");
- this.state = state;
- this.output = output;
- }
-
- public State state() {
- return state;
- }
-
- public boolean succeed() {
- return state == State.SUCCEED;
- }
-
-
- public String output() {
- return output;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java b/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java
deleted file mode 100644
index 98c950e..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import org.apache.kylin.job.exception.ExecuteException;
-
-/**
- */
-public interface Idempotent {
-
- void cleanup() throws ExecuteException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/Output.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/Output.java b/job/src/main/java/org/apache/kylin/job/execution/Output.java
deleted file mode 100644
index 4d93132..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/Output.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import java.util.Map;
-
-/**
- */
-public interface Output {
-
- Map<String, String> getExtra();
-
- String getVerboseMsg();
-
- ExecutableState getState();
-
- long getLastModified();
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
deleted file mode 100644
index c10d4e0..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop;
-
-/**
- * @author George Song (ysong1)
- *
- */
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.StringSplitter;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
-import org.apache.kylin.job.exception.JobException;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.hadoop.util.StringUtils.formatTime;
-
-@SuppressWarnings("static-access")
-public abstract class AbstractHadoopJob extends Configured implements Tool {
- protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
-
- protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
- protected static final Option OPTION_JOB_FLOW_ID = OptionBuilder.withArgName("job flow ID").hasArg().isRequired(true).withDescription("job flow ID").create("jobflowid");
- protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
- protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname");
- protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
- protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
- protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
- protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
- protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim");
- protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
- protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
- protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
- protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
- protected static final Option OPTION_KEY_COLUMN_PERCENTAGE = OptionBuilder.withArgName("rowkey column percentage").hasArg().isRequired(true).withDescription("Percentage of row key columns").create("columnpercentage");
- protected static final Option OPTION_KEY_SPLIT_NUMBER = OptionBuilder.withArgName("key split number").hasArg().isRequired(true).withDescription("Number of key split range").create("splitnumber");
-
- protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName("statisticsenabled").hasArg().isRequired(false).withDescription("Statistics enabled").create("statisticsenabled");
- protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName("statisticsoutput").hasArg().isRequired(false).withDescription("Statistics output").create("statisticsoutput");
- protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName("statisticssamplingpercent").hasArg().isRequired(false).withDescription("Statistics sampling percentage").create("statisticssamplingpercent");
-
- protected String name;
- protected String description;
- protected boolean isAsync = false;
- protected OptionsHelper optionsHelper = new OptionsHelper();
-
- protected Job job;
-
- public AbstractHadoopJob() {
- super(HadoopUtil.newHadoopJobConfiguration());
- }
-
- protected void parseOptions(Options options, String[] args) throws ParseException {
- optionsHelper.parseOptions(options, args);
- }
-
- public void printUsage(Options options) {
- optionsHelper.printUsage(getClass().getSimpleName(), options);
- }
-
- public Option[] getOptions() {
- return optionsHelper.getOptions();
- }
-
- public String getOptionsAsString() {
- return optionsHelper.getOptionsAsString();
- }
-
- protected String getOptionValue(Option option) {
- return optionsHelper.getOptionValue(option);
- }
-
- protected boolean hasOption(Option option) {
- return optionsHelper.hasOption(option);
- }
-
- protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
- int retVal = 0;
- long start = System.nanoTime();
- if (isAsync) {
- job.submit();
- } else {
- job.waitForCompletion(true);
- retVal = job.isSuccessful() ? 0 : 1;
- logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") + formatTime((System.nanoTime() - start) / 1000000L));
- }
- return retVal;
- }
-
- protected static void runJob(Tool job, String[] args) {
- try {
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- } catch (Exception e) {
- e.printStackTrace(System.err);
- System.exit(5);
- }
- }
-
- private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
-
- protected void setJobClasspath(Job job) {
- String jarPath = KylinConfig.getInstanceFromEnv().getKylinJobJarPath();
- File jarFile = new File(jarPath);
- if (jarFile.exists()) {
- job.setJar(jarPath);
- logger.info("append job jar: " + jarPath);
- } else {
- job.setJarByClass(this.getClass());
- }
-
- String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
- String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency");
- logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " and kylin.hive.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
-
- Configuration jobConf = job.getConfiguration();
- String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
- if (classpath == null || classpath.length() == 0) {
- logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value.");
- classpath = getDefaultMapRedClasspath();
- logger.info("The default mapred classpath is: " + classpath);
- }
-
-
- if (kylinHBaseDependency != null) {
- // yarn classpath is comma separated
- kylinHBaseDependency = kylinHBaseDependency.replace(":", ",");
- classpath = classpath + "," + kylinHBaseDependency;
- }
-
- if (kylinHiveDependency != null) {
- // yarn classpath is comma separated
- kylinHiveDependency = kylinHiveDependency.replace(":", ",");
- classpath = classpath + "," + kylinHiveDependency;
- }
-
- jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency);
- logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
- }
-
-
- private String getDefaultMapRedClasspath() {
-
- String classpath = "";
- try {
- CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
- ShellCmdOutput output = new ShellCmdOutput();
- executor.execute("mapred classpath", output);
-
- classpath = output.getOutput().trim().replace(':', ',');
- } catch (IOException e) {
- logger.error("Failed to run: 'mapred classpath'.", e);
- }
-
- return classpath;
- }
-
-
- public void addInputDirs(String input, Job job) throws IOException {
- for (String inp : StringSplitter.split(input, ",")) {
- inp = inp.trim();
- if (inp.endsWith("/*")) {
- inp = inp.substring(0, inp.length() - 2);
- FileSystem fs = FileSystem.get(job.getConfiguration());
- Path path = new Path(inp);
- FileStatus[] fileStatuses = fs.listStatus(path);
- boolean hasDir = false;
- for (FileStatus stat : fileStatuses) {
- if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
- hasDir = true;
- addInputDirs(stat.getPath().toString(), job);
- }
- }
- if (fileStatuses.length > 0 && !hasDir) {
- addInputDirs(path.toString(), job);
- }
- } else {
- logger.debug("Add input " + inp);
- FileInputFormat.addInputPath(job, new Path(inp));
- }
- }
- }
-
- public static KylinConfig loadKylinPropsAndMetadata() throws IOException {
- File metaDir = new File("meta");
- System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
- logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- kylinConfig.setMetadataUrl(metaDir.getCanonicalPath());
- return kylinConfig;
- }
-
- protected void attachKylinPropsAndMetadata(TableDesc table, Configuration conf) throws IOException {
- ArrayList<String> dumpList = new ArrayList<String>();
- dumpList.add(table.getResourcePath());
- attachKylinPropsAndMetadata(dumpList, conf);
- }
-
- protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
- MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- // write cube / model_desc / cube_desc / dict / table
- ArrayList<String> dumpList = new ArrayList<String>();
- dumpList.add(cube.getResourcePath());
- dumpList.add(cube.getDescriptor().getModel().getResourcePath());
- dumpList.add(cube.getDescriptor().getResourcePath());
-
- for (String tableName : cube.getDescriptor().getModel().getAllTables()) {
- TableDesc table = metaMgr.getTableDesc(tableName);
- dumpList.add(table.getResourcePath());
- }
- for (CubeSegment segment : cube.getSegments()) {
- dumpList.addAll(segment.getDictionaryPaths());
- }
-
- attachKylinPropsAndMetadata(dumpList, conf);
- }
-
- protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
- MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- // write II / model_desc / II_desc / dict / table
- ArrayList<String> dumpList = new ArrayList<String>();
- dumpList.add(ii.getResourcePath());
- dumpList.add(ii.getDescriptor().getModel().getResourcePath());
- dumpList.add(ii.getDescriptor().getResourcePath());
-
- for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
- TableDesc table = metaMgr.getTableDesc(tableName);
- dumpList.add(table.getResourcePath());
- }
- for (IISegment segment : ii.getSegments()) {
- dumpList.addAll(segment.getDictionaryPaths());
- }
-
- attachKylinPropsAndMetadata(dumpList, conf);
- }
-
- private void attachKylinPropsAndMetadata(ArrayList<String> dumpList, Configuration conf) throws IOException {
- File tmp = File.createTempFile("kylin_job_meta", "");
- tmp.delete(); // we need a directory, so delete the file first
-
- File metaDir = new File(tmp, "meta");
- metaDir.mkdirs();
- metaDir.getParentFile().deleteOnExit();
-
- // write kylin.properties
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- File kylinPropsFile = new File(metaDir, "kylin.properties");
- kylinConfig.writeProperties(kylinPropsFile);
-
- // write resources
- dumpResources(kylinConfig, metaDir, dumpList);
-
- // hadoop distributed cache
- conf.set("tmpfiles", "file:///" + OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()));
- }
-
- private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
- ResourceStore from = ResourceStore.getStore(kylinConfig);
- KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
- ResourceStore to = ResourceStore.getStore(localConfig);
- for (String path : dumpList) {
- InputStream in = from.getResource(path);
- if (in == null)
- throw new IllegalStateException("No resource found at -- " + path);
- long ts = from.getResourceTimestamp(path);
- to.putResource(path, in, ts);
- //The following log is duplicate with in ResourceStore
- //log.info("Dumped resource " + path + " to " + metaDir.getAbsolutePath());
- }
- }
-
- protected void deletePath(Configuration conf, Path path) throws IOException {
- HadoopUtil.deletePath(conf, path);
- }
-
- protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException {
- if (job == null) {
- throw new JobException("Job is null");
- }
-
- long mapInputBytes = 0;
- InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
- for (InputSplit split : input.getSplits(job)) {
- mapInputBytes += split.getLength();
- }
- if (mapInputBytes == 0) {
- throw new IllegalArgumentException("Map input splits are 0 bytes, something is wrong!");
- }
- double totalMapInputMB = (double) mapInputBytes / 1024 / 1024;
- return totalMapInputMB;
- }
-
- protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException {
- if (job == null) {
- throw new JobException("Job is null");
- }
- InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
- return input.getSplits(job).size();
- }
-
- public void kill() throws JobException {
- if (job != null) {
- try {
- job.killJob();
- } catch (IOException e) {
- throw new JobException(e);
- }
- }
- }
-
- public Map<String, String> getInfo() throws JobException {
- if (job != null) {
- Map<String, String> status = new HashMap<String, String>();
- if (null != job.getJobID()) {
- status.put(JobInstance.MR_JOB_ID, job.getJobID().toString());
- }
- if (null != job.getTrackingURL()) {
- status.put(JobInstance.YARN_APP_URL, job.getTrackingURL().toString());
- }
-
- return status;
- } else {
- throw new JobException("Job is null");
- }
- }
-
- public Counters getCounters() throws JobException {
- if (job != null) {
- try {
- return job.getCounters();
- } catch (IOException e) {
- throw new JobException(e);
- }
- } else {
- throw new JobException("Job is null");
- }
- }
-
- public void setAsync(boolean isAsync) {
- this.isAsync = isAsync;
- }
-
- public Job getJob() {
- return this.job;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
index 7f549d0..2e3cdb0 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
@@ -32,10 +32,10 @@ import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
index 0670178..de10e65 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
/**
* This hadoop job will scan all rows of the hive table and then calculate the cardinality on each column.