You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:43 UTC
[41/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job
module into 'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
new file mode 100644
index 0000000..932da38
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.LookupDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+
+public class JoinedFlatTable {
+
+ public static final String FACT_TABLE_ALIAS = "FACT_TABLE";
+
+ public static final String LOOKUP_TABLE_ALAIS_PREFIX = "LOOKUP_";
+
+ public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
+ return storageDfsDir + "/" + intermediateTableDesc.getTableName();
+ }
+
+ public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
+ StringBuilder ddl = new StringBuilder();
+
+ ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName() + "\n");
+
+ ddl.append("(" + "\n");
+ for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
+ IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
+ if (i > 0) {
+ ddl.append(",");
+ }
+ ddl.append(colName(col.getCanonicalName()) + " " + getHiveDataType(col.getDataType()) + "\n");
+ }
+ ddl.append(")" + "\n");
+
+ ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
+ ddl.append("STORED AS SEQUENCEFILE" + "\n");
+ ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n");
+ // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
+ // ";\n");
+ return ddl.toString();
+ }
+
+ public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+ StringBuilder ddl = new StringBuilder();
+ ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName() + ";").append("\n");
+ return ddl.toString();
+ }
+
+ public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) throws IOException {
+ StringBuilder sql = new StringBuilder();
+
+ File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath());
+
+ if (hadoopPropertiesFile.exists()) {
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder builder;
+ Document doc;
+ try {
+ builder = factory.newDocumentBuilder();
+ doc = builder.parse(hadoopPropertiesFile);
+ NodeList nl = doc.getElementsByTagName("property");
+ for (int i = 0; i < nl.getLength(); i++) {
+ String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
+ String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
+ if (name.equals("tmpjars") == false) {
+ sql.append("SET " + name + "=" + value + ";").append("\n");
+ }
+ }
+
+ } catch (ParserConfigurationException e) {
+ throw new IOException(e);
+ } catch (SAXException e) {
+ throw new IOException(e);
+ }
+ }
+
+ sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
+
+ return sql.toString();
+ }
+
+ public static String generateSelectDataStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("SELECT" + "\n");
+ String tableAlias;
+ Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel());
+ for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
+ IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
+ if (i > 0) {
+ sql.append(",");
+ }
+ tableAlias = tableAliasMap.get(col.getTableName());
+ sql.append(tableAlias + "." + col.getColumnName() + "\n");
+ }
+ appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
+ appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
+ return sql.toString();
+ }
+
+ private static Map<String, String> buildTableAliasMap(DataModelDesc dataModelDesc) {
+ Map<String, String> tableAliasMap = new HashMap<String, String>();
+
+ tableAliasMap.put(dataModelDesc.getFactTable().toUpperCase(), FACT_TABLE_ALIAS);
+
+ int i = 1;
+ for (LookupDesc lookupDesc: dataModelDesc.getLookups()) {
+ JoinDesc join = lookupDesc.getJoin();
+ if (join != null) {
+ tableAliasMap.put(lookupDesc.getTable().toUpperCase(), LOOKUP_TABLE_ALAIS_PREFIX + i);
+ i++;
+ }
+
+ }
+ return tableAliasMap;
+ }
+
+ private static void appendJoinStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+ Set<String> dimTableCache = new HashSet<String>();
+
+ DataModelDesc dataModelDesc = intermediateTableDesc.getDataModel();
+ String factTableName = dataModelDesc.getFactTable();
+ String factTableAlias = tableAliasMap.get(factTableName);
+ sql.append("FROM " + factTableName + " as " + factTableAlias + " \n");
+
+ for (LookupDesc lookupDesc : dataModelDesc.getLookups()) {
+ JoinDesc join = lookupDesc.getJoin();
+ if (join != null && join.getType().equals("") == false) {
+ String joinType = join.getType().toUpperCase();
+ String dimTableName = lookupDesc.getTable();
+ if (!dimTableCache.contains(dimTableName)) {
+ TblColRef[] pk = join.getPrimaryKeyColumns();
+ TblColRef[] fk = join.getForeignKeyColumns();
+ if (pk.length != fk.length) {
+ throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
+ }
+ sql.append(joinType + " JOIN " + dimTableName + " as " + tableAliasMap.get(dimTableName) + "\n");
+ sql.append("ON ");
+ for (int i = 0; i < pk.length; i++) {
+ if (i > 0) {
+ sql.append(" AND ");
+ }
+ sql.append(factTableAlias + "." + fk[i].getName() + " = " + tableAliasMap.get(dimTableName) + "." + pk[i].getName());
+ }
+ sql.append("\n");
+
+ dimTableCache.add(dimTableName);
+ }
+ }
+ }
+ }
+
+ private static void appendWhereStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+ if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
+ return;//TODO: for now only cube segments support filter and partition
+ }
+ CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc;
+
+ boolean hasCondition = false;
+ StringBuilder whereBuilder = new StringBuilder();
+ whereBuilder.append("WHERE");
+
+ CubeDesc cubeDesc = desc.getCubeDesc();
+ DataModelDesc model = cubeDesc.getModel();
+
+ if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) {
+ whereBuilder.append(" (").append(model.getFilterCondition()).append(") ");
+ hasCondition = true;
+ }
+
+ CubeSegment cubeSegment = desc.getCubeSegment();
+
+ if (null != cubeSegment) {
+ PartitionDesc partDesc = model.getPartitionDesc();
+ long dateStart = cubeSegment.getDateRangeStart();
+ long dateEnd = cubeSegment.getDateRangeEnd();
+
+ if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) {
+ whereBuilder.append(hasCondition ? " AND (" : " (");
+ whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, dateStart, dateEnd, tableAliasMap));
+ whereBuilder.append(")\n");
+ hasCondition = true;
+ }
+ }
+
+ if (hasCondition) {
+ sql.append(whereBuilder.toString());
+ }
+ }
+
+ private static String colName(String canonicalColName) {
+ return canonicalColName.replace(".", "_");
+ }
+
+ private static String getHiveDataType(String javaDataType) {
+ String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType;
+ hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType;
+
+ return hiveDataType.toLowerCase();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
new file mode 100644
index 0000000..2ed2fc2
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.SchedulerException;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.lock.JobLock;
+
+/**
+ */
+public interface Scheduler<T extends Executable> {
+
+ void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException;
+
+ void shutdown() throws SchedulerException;
+
+ boolean stop(T executable) throws SchedulerException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
new file mode 100644
index 0000000..29b5324
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+/**
+ */
+public abstract class BaseCommandOutput implements ICommandOutput {
+
+ @Override
+ public void log(String message) {
+ this.appendOutput(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
new file mode 100644
index 0000000..6cab6a3
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.apache.kylin.common.util.Logger;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+
+/**
+ * @author xjiang
+ *
+ */
+public interface ICommandOutput extends Logger {
+
+ public void setStatus(JobStepStatusEnum status);
+
+ public JobStepStatusEnum getStatus();
+
+ public void appendOutput(String message);
+
+ public String getOutput();
+
+ public void setExitCode(int exitCode);
+
+ public int getExitCode();
+
+ public void reset();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java b/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
new file mode 100644
index 0000000..5a47173
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.apache.kylin.job.exception.JobException;
+
+/**
+ * @author xjiang
+ *
+ */
+public interface IJobCommand {
+
+ public ICommandOutput execute() throws JobException;
+
+ public void cancel() throws JobException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
new file mode 100644
index 0000000..6a718fc
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.*;
+
+/**
+ * @author xjiang
+ *
+ */
+public class ShellCmd implements IJobCommand {
+
+ private static Logger log = LoggerFactory.getLogger(ShellCmd.class);
+
+ private final String executeCommand;
+ private final ICommandOutput output;
+ private final boolean isAsync;
+ private final CliCommandExecutor cliCommandExecutor;
+
+ private FutureTask<Integer> future;
+
+ private ShellCmd(String executeCmd, ICommandOutput out, String host, int port, String user, String password, boolean async) {
+ this.executeCommand = executeCmd;
+ this.output = out;
+ this.cliCommandExecutor = new CliCommandExecutor();
+ this.cliCommandExecutor.setRunAtRemote(host, port, user, password);
+ this.isAsync = async;
+ }
+
+ public ShellCmd(String executeCmd, String host, int port, String user, String password, boolean async) {
+ this(executeCmd, new ShellCmdOutput(), host, port, user, password, async);
+ }
+
+ @Override
+ public ICommandOutput execute() throws JobException {
+
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ future = new FutureTask<Integer>(new Callable<Integer>() {
+ public Integer call() throws JobException, IOException {
+ executor.shutdown();
+ return executeCommand(executeCommand);
+ }
+ });
+ executor.execute(future);
+
+ int exitCode = -1;
+ if (!isAsync) {
+ try {
+ exitCode = future.get();
+ log.info("finish executing");
+ } catch (CancellationException e) {
+ log.debug("Command is cancelled");
+ exitCode = -2;
+ } catch (Exception e) {
+ throw new JobException("Error when execute job " + executeCommand, e);
+ } finally {
+ if (exitCode == 0) {
+ output.setStatus(JobStepStatusEnum.FINISHED);
+ } else if (exitCode == -2) {
+ output.setStatus(JobStepStatusEnum.DISCARDED);
+ } else {
+ output.setStatus(JobStepStatusEnum.ERROR);
+ }
+ output.setExitCode(exitCode);
+ }
+ }
+ return output;
+ }
+
+ protected int executeCommand(String command) throws JobException, IOException {
+ output.reset();
+ output.setStatus(JobStepStatusEnum.RUNNING);
+ return cliCommandExecutor.execute(command, output).getFirst();
+ }
+
+ @Override
+ public void cancel() throws JobException {
+ future.cancel(true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
new file mode 100644
index 0000000..ebcad47
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+
+/**
+ * @author xjiang
+ *
+ */
+public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput {
+
+ protected static final Logger log = LoggerFactory.getLogger(ShellCmdOutput.class);
+
+ protected StringBuilder output;
+ protected int exitCode;
+ protected JobStepStatusEnum status;
+
+ public ShellCmdOutput() {
+ init();
+ }
+
+ private void init() {
+ output = new StringBuilder();
+ exitCode = -1;
+ status = JobStepStatusEnum.NEW;
+ }
+
+ @Override
+ public JobStepStatusEnum getStatus() {
+ return status;
+ }
+
+ @Override
+ public void setStatus(JobStepStatusEnum s) {
+ this.status = s;
+ }
+
+ @Override
+ public String getOutput() {
+ return output.toString();
+ }
+
+ @Override
+ public void appendOutput(String message) {
+ output.append(message).append(System.getProperty("line.separator"));
+ log.debug(message);
+ }
+
+ @Override
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ @Override
+ public void setExitCode(int code) {
+ exitCode = code;
+ }
+
+ @Override
+ public void reset() {
+ init();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java b/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
new file mode 100644
index 0000000..1cda348
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.common;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+/**
+ */
+public class OptionsHelper {
+ private CommandLine commandLine;
+
+ public void parseOptions(Options options, String[] args) throws ParseException {
+ CommandLineParser parser = new GnuParser();
+ commandLine = parser.parse(options, args);
+ }
+
+ public Option[] getOptions() {
+ return commandLine.getOptions();
+ }
+
+ public String getOptionsAsString() {
+ StringBuilder buf = new StringBuilder();
+ for (Option option : commandLine.getOptions()) {
+ buf.append(" ");
+ buf.append(option.getOpt());
+ if (option.hasArg()) {
+ buf.append("=");
+ buf.append(option.getValue());
+ }
+ }
+ return buf.toString();
+ }
+
+ public String getOptionValue(Option option) {
+ return commandLine.getOptionValue(option.getOpt());
+ }
+
+ public boolean hasOption(Option option) {
+ return commandLine.hasOption(option.getOpt());
+ }
+
+ public void printUsage(String programName, Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(programName, options);
+ }
+
+ public static String convertToFileURL(String path) {
+ if (File.separatorChar != '/') {
+ path = path.replace(File.separatorChar, '/');
+ }
+
+ return path;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
new file mode 100644
index 0000000..786698e
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.common;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.kylin.common.util.Pair;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.util.Logger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+/**
+ */
+public class ShellExecutable extends AbstractExecutable {
+
+ private static final String CMD = "cmd";
+
+ public ShellExecutable() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ try {
+ logger.info("executing:" + getCmd());
+ final ShellExecutableLogger logger = new ShellExecutableLogger();
+ final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger);
+ executableManager.addJobInfo(getId(), logger.getInfo());
+ return new ExecuteResult(result.getFirst() == 0? ExecuteResult.State.SUCCEED: ExecuteResult.State.FAILED, result.getSecond());
+ } catch (IOException e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+ public void setCmd(String cmd) {
+ setParam(CMD, cmd);
+ }
+
+ public String getCmd() {
+ return getParam(CMD);
+ }
+
+ private static class ShellExecutableLogger implements Logger {
+
+ private final Map<String, String> info = Maps.newHashMap();
+
+ private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
+ private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
+ private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
+ private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
+ private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
+ private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
+
+ // hive
+ private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
+ private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
+
+ @Override
+ public void log(String message) {
+ Matcher matcher = PATTERN_APP_ID.matcher(message);
+ if (matcher.find()) {
+ String appId = matcher.group(1);
+ info.put(ExecutableConstants.YARN_APP_ID, appId);
+ }
+
+ matcher = PATTERN_APP_URL.matcher(message);
+ if (matcher.find()) {
+ String appTrackingUrl = matcher.group(1);
+ info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
+ }
+
+ matcher = PATTERN_JOB_ID.matcher(message);
+ if (matcher.find()) {
+ String mrJobID = matcher.group(1);
+ info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
+ }
+
+ matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
+ if (matcher.find()) {
+ String hdfsWritten = matcher.group(1);
+ info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
+ }
+
+ matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
+ if (matcher.find()) {
+ String sourceCount = matcher.group(1);
+ info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
+ }
+
+ matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
+ if (matcher.find()) {
+ String sourceSize = matcher.group(1);
+ info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
+ }
+
+ // hive
+ matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
+ if (matcher.find()) {
+ String jobId = matcher.group(1);
+ String trackingUrl = matcher.group(2);
+ info.put(ExecutableConstants.MR_JOB_ID, jobId);
+ info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
+ }
+
+ matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
+ if (matcher.find()) {
+ // String hdfsRead = matcher.group(1);
+ String hdfsWritten = matcher.group(2);
+ info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
+ }
+ }
+
+ Map<String, String> getInfo() {
+ return info;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
new file mode 100644
index 0000000..fdcfdbe
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.constant;
+
+/**
+ */
+public final class ExecutableConstants {
+
+ private ExecutableConstants(){}
+
+ public static final String YARN_APP_ID = "yarn_application_id";
+
+ public static final String YARN_APP_URL = "yarn_application_tracking_url";
+ public static final String MR_JOB_ID = "mr_job_id";
+ public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
+ public static final String SOURCE_RECORDS_COUNT = "source_records_count";
+ public static final String SOURCE_RECORDS_SIZE = "source_records_size";
+ public static final String GLOBAL_LISTENER_NAME = "ChainListener";
+
+
+
+
+ public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60;
+
+ public static final String CUBE_JOB_GROUP_NAME = "cube_job_group";
+
+ public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group";
+ public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
+
+ public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
+ public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
+ public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
+ public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube";
+ public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid Data";
+ public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
+ public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
+ public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
+ public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
+ public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
+ public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
+ public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
+ public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
+ public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
+ public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
+
+ public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
+ public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
+
+ public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
+ public static final String PROP_JOB_FLOW = "jobFlow";
+ public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid";
+ public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID";
+ public static final String PROP_COMMAND = "command";
+ // public static final String PROP_STORAGE_LOCATION =
+ // "storageLocationIdentifier";
+ public static final String PROP_JOB_ASYNC = "jobAsync";
+ public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor";
+ public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput";
+ public static final String PROP_JOB_KILLED = "jobKilled";
+ public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows";
+
+ public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java b/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
new file mode 100644
index 0000000..a4ef564
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.constant;
+
+public enum JobStatusEnum {
+
+ NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16);
+
+ private final int code;
+
+ private JobStatusEnum(int statusCode) {
+ this.code = statusCode;
+ }
+
+ public static JobStatusEnum getByCode(int statusCode) {
+ for (JobStatusEnum status : values()) {
+ if (status.getCode() == statusCode) {
+ return status;
+ }
+ }
+
+ return null;
+ }
+
+ public int getCode() {
+ return this.code;
+ }
+
+ public boolean isComplete() {
+ return code == JobStatusEnum.FINISHED.getCode() || code == JobStatusEnum.ERROR.getCode() || code == JobStatusEnum.DISCARDED.getCode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java b/core-job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
new file mode 100644
index 0000000..02b40a3
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.constant;
+
+/**
+ * @author xduo, ysong1
+ *
+ */
+public enum JobStepCmdTypeEnum {
+ SHELL_CMD, SHELL_CMD_HADOOP, JAVA_CMD_HADOOP_FACTDISTINCT, JAVA_CMD_HADOOP_BASECUBOID, JAVA_CMD_HADOOP_NDCUBOID, JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION, JAVA_CMD_HADOOP_CONVERTHFILE, JAVA_CMD_HADOOP_MERGECUBOID, JAVA_CMD_HADOOP_NO_MR_DICTIONARY, JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE, JAVA_CMD_HADOOP_NO_MR_BULKLOAD
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java b/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
new file mode 100644
index 0000000..08ee79a
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.constant;
+
+public enum JobStepStatusEnum {
+ NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), WAITING(32), KILLED(64);
+
+ private final int code;
+
+ private JobStepStatusEnum(int statusCode) {
+ this.code = statusCode;
+ }
+
+ public static JobStepStatusEnum getByCode(int statusCode) {
+ for (JobStepStatusEnum status : values()) {
+ if (status.getCode() == statusCode) {
+ return status;
+ }
+ }
+
+ return null;
+ }
+
+ public int getCode() {
+ return this.code;
+ }
+
+ public boolean isComplete() {
+ return code == JobStepStatusEnum.FINISHED.getCode() || code == JobStepStatusEnum.ERROR.getCode() || code == JobStepStatusEnum.DISCARDED.getCode();
+ }
+
+ public boolean isRunable() {
+ return code == JobStepStatusEnum.PENDING.getCode() || code == JobStepStatusEnum.ERROR.getCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
new file mode 100644
index 0000000..4862bb1
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.dao;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.metadata.MetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ */
+public class ExecutableDao {
+
+ private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class);
+ private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
+ private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
+ private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
+ public static final String JOB_PATH_ROOT = "/execute";
+ public static final String JOB_OUTPUT_ROOT = "/execute_output";
+
+ private ResourceStore store;
+
+ public static ExecutableDao getInstance(KylinConfig config) {
+ ExecutableDao r = CACHE.get(config);
+ if (r == null) {
+ r = new ExecutableDao(config);
+ CACHE.put(config, r);
+ if (CACHE.size() > 1) {
+ logger.warn("More than one singleton exist");
+ }
+
+ }
+ return r;
+ }
+
+ private ExecutableDao(KylinConfig config) {
+ logger.info("Using metadata url: " + config);
+ this.store = MetadataManager.getInstance(config).getStore();
+ }
+
+ private String pathOfJob(ExecutablePO job) {
+ return pathOfJob(job.getUuid());
+ }
+ private String pathOfJob(String uuid) {
+ return JOB_PATH_ROOT + "/" + uuid;
+ }
+
+ private String pathOfJobOutput(String uuid) {
+ return JOB_OUTPUT_ROOT + "/" + uuid;
+ }
+
+ private ExecutablePO readJobResource(String path) throws IOException {
+ return store.getResource(path, ExecutablePO.class, JOB_SERIALIZER);
+ }
+
+ private void writeJobResource(String path, ExecutablePO job) throws IOException {
+ store.putResource(path, job, JOB_SERIALIZER);
+ }
+
+ private ExecutableOutputPO readJobOutputResource(String path) throws IOException {
+ return store.getResource(path, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
+ }
+
+ private long writeJobOutputResource(String path, ExecutableOutputPO output) throws IOException {
+ return store.putResource(path, output, JOB_OUTPUT_SERIALIZER);
+ }
+
+ public List<ExecutableOutputPO> getJobOutputs() throws PersistentException {
+ try {
+ ArrayList<String> resources = store.listResources(JOB_OUTPUT_ROOT);
+ if (resources == null || resources.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Collections.sort(resources);
+ String rangeStart = resources.get(0);
+ String rangeEnd = resources.get(resources.size() - 1);
+ return store.getAllResources(rangeStart, rangeEnd, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
+ } catch (IOException e) {
+ logger.error("error get all Jobs:", e);
+ throw new PersistentException(e);
+ }
+ }
+
+ public List<ExecutablePO> getJobs() throws PersistentException {
+ try {
+ final List<String> jobIds = store.listResources(JOB_PATH_ROOT);
+ if (jobIds == null || jobIds.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Collections.sort(jobIds);
+ String rangeStart = jobIds.get(0);
+ String rangeEnd = jobIds.get(jobIds.size() - 1);
+ return store.getAllResources(rangeStart, rangeEnd, ExecutablePO.class, JOB_SERIALIZER);
+ } catch (IOException e) {
+ logger.error("error get all Jobs:", e);
+ throw new PersistentException(e);
+ }
+ }
+
+ public List<String> getJobIds() throws PersistentException {
+ try {
+ ArrayList<String> resources = store.listResources(JOB_PATH_ROOT);
+ if (resources == null) {
+ return Collections.emptyList();
+ }
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(resources.size());
+ for (String path : resources) {
+ result.add(path.substring(path.lastIndexOf("/") + 1));
+ }
+ return result;
+ } catch (IOException e) {
+ logger.error("error get all Jobs:", e);
+ throw new PersistentException(e);
+ }
+ }
+
+ public ExecutablePO getJob(String uuid) throws PersistentException {
+ try {
+ return readJobResource(pathOfJob(uuid));
+ } catch (IOException e) {
+ logger.error("error get job:" + uuid, e);
+ throw new PersistentException(e);
+ }
+ }
+
+ public ExecutablePO addJob(ExecutablePO job) throws PersistentException {
+ try {
+ if (getJob(job.getUuid()) != null) {
+ throw new IllegalArgumentException("job id:" + job.getUuid() + " already exists");
+ }
+ writeJobResource(pathOfJob(job), job);
+ return job;
+ } catch (IOException e) {
+ logger.error("error save job:" + job.getUuid(), e);
+ throw new PersistentException(e);
+ }
+ }
+
+ public void deleteJob(String uuid) throws PersistentException {
+ try {
+ store.deleteResource(pathOfJob(uuid));
+ } catch (IOException e) {
+ logger.error("error delete job:" + uuid, e);
+ throw new PersistentException(e);
+ }
+ }
+
+ public ExecutableOutputPO getJobOutput(String uuid) throws PersistentException {
+ try {
+ ExecutableOutputPO result = readJobOutputResource(pathOfJobOutput(uuid));
+ if (result == null) {
+ result = new ExecutableOutputPO();
+ result.setUuid(uuid);
+ return result;
+ }
+ return result;
+ } catch (IOException e) {
+ logger.error("error get job output id:" + uuid, e);
+ throw new PersistentException(e);
+ }
+ }
+
+ public void addJobOutput(ExecutableOutputPO output) throws PersistentException {
+ try {
+ output.setLastModified(0);
+ writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
+ } catch (IOException e) {
+ logger.error("error update job output id:" + output.getUuid(), e);
+ throw new PersistentException(e);
+ }
+ }
+
+ public void updateJobOutput(ExecutableOutputPO output) throws PersistentException {
+ try {
+ final long ts = writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
+ output.setLastModified(ts);
+ } catch (IOException e) {
+ logger.error("error update job output id:" + output.getUuid(), e);
+ throw new PersistentException(e);
+ }
+ }
+
+ public void deleteJobOutput(String uuid) throws PersistentException {
+ try {
+ store.deleteResource(pathOfJobOutput(uuid));
+ } catch (IOException e) {
+ logger.error("error delete job:" + uuid, e);
+ throw new PersistentException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
new file mode 100644
index 0000000..4dacd8a
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.dao;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class ExecutableOutputPO extends RootPersistentEntity {
+
+ @JsonProperty("content")
+ private String content;
+
+ @JsonProperty("status")
+ private String status = "READY";
+
+ @JsonProperty("info")
+ private Map<String, String> info = Maps.newHashMap();
+
+ public String getContent() {
+ return content;
+ }
+
+ public void setContent(String content) {
+ this.content = content;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public Map<String, String> getInfo() {
+ return info;
+ }
+
+ public void setInfo(Map<String, String> info) {
+ this.info = info;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
new file mode 100644
index 0000000..6a17b29
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.dao;
+
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class ExecutablePO extends RootPersistentEntity {
+
+ @JsonProperty("name")
+ private String name;
+
+ @JsonProperty("tasks")
+ private List<ExecutablePO> tasks;
+
+ @JsonProperty("type")
+ private String type;
+
+ @JsonProperty("params")
+ private Map<String, String> params = Maps.newHashMap();
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public List<ExecutablePO> getTasks() {
+ return tasks;
+ }
+
+ public void setTasks(List<ExecutablePO> tasks) {
+ this.tasks = tasks;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public Map<String, String> getParams() {
+ return params;
+ }
+
+ public void setParams(Map<String, String> params) {
+ this.params = params;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
new file mode 100644
index 0000000..4d4d944
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.engine;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.common.OptionsHelper;
+import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author ysong1
+ */
+public class JobEngineConfig {
+ private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class);
+ public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
+ public static String HIVE_CONF_FILENAME = "kylin_hive_conf";
+
+ private static File getJobConfig(String fileName) {
+ String path = System.getProperty(KylinConfig.KYLIN_CONF);
+ if (StringUtils.isNotEmpty(path)) {
+ return new File(path, fileName);
+ }
+
+ path = KylinConfig.getKylinHome();
+ if (StringUtils.isNotEmpty(path)) {
+ return new File(path + File.separator + "conf", fileName);
+ }
+ return null;
+ }
+
+ private String getHadoopJobConfFilePath(RealizationCapacity capaticy, boolean appendSuffix) throws IOException {
+ String hadoopJobConfFile;
+ if (appendSuffix) {
+ hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml");
+ } else {
+ hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
+ }
+
+ File jobConfig = getJobConfig(hadoopJobConfFile);
+ if (jobConfig == null || !jobConfig.exists()) {
+ logger.warn("fail to locate " + hadoopJobConfFile + ", trying to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
+ jobConfig = getJobConfig(HADOOP_JOB_CONF_FILENAME + ".xml");
+ if (jobConfig == null || !jobConfig.exists()) {
+ logger.error("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
+ throw new RuntimeException("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
+ }
+ }
+ return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
+ }
+
+ public String getHadoopJobConfFilePath(RealizationCapacity capaticy) throws IOException {
+ String path = getHadoopJobConfFilePath(capaticy, true);
+ if (!StringUtils.isEmpty(path)) {
+ logger.info("Chosen job conf is : " + path);
+ return path;
+ } else {
+ path = getHadoopJobConfFilePath(capaticy, false);
+ if (!StringUtils.isEmpty(path)) {
+ logger.info("Chosen job conf is : " + path);
+ return path;
+ }
+ }
+ return "";
+ }
+
+
+ public String getHiveConfFilePath() throws IOException {
+ String hiveConfFile = (HIVE_CONF_FILENAME + ".xml");
+
+ File jobConfig = getJobConfig(hiveConfFile);
+ if (jobConfig == null || !jobConfig.exists()) {
+
+ logger.error("fail to locate " + HIVE_CONF_FILENAME + ".xml");
+ throw new RuntimeException("fail to locate " + HIVE_CONF_FILENAME + ".xml");
+ }
+ return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
+ }
+
+ // there should be no setters
+ private final KylinConfig config;
+
+ public JobEngineConfig(KylinConfig config) {
+ this.config = config;
+ }
+
+ public KylinConfig getConfig() {
+ return config;
+ }
+
+ public String getHdfsWorkingDirectory() {
+ return config.getHdfsWorkingDirectory();
+ }
+
+ /**
+ * @return the maxConcurrentJobLimit
+ */
+ public int getMaxConcurrentJobLimit() {
+ return config.getMaxConcurrentJobLimit();
+ }
+
+ /**
+ * @return the timeZone
+ */
+ public String getTimeZone() {
+ return config.getTimeZone();
+ }
+
+ /**
+ * @return the adminDls
+ */
+ public String getAdminDls() {
+ return config.getAdminDls();
+ }
+
+ /**
+ * @return the jobStepTimeout
+ */
+ public long getJobStepTimeout() {
+ return config.getJobStepTimeout();
+ }
+
+ /**
+ * @return the asyncJobCheckInterval
+ */
+ public int getAsyncJobCheckInterval() {
+ return config.getYarnStatusCheckIntervalSeconds();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((config == null) ? 0 : config.hashCode());
+ return result;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ JobEngineConfig other = (JobEngineConfig) obj;
+ if (config == null) {
+ if (other.config != null)
+ return false;
+ } else if (!config.equals(other.config))
+ return false;
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java b/core-job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
new file mode 100644
index 0000000..8544fff
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class ExecuteException extends Exception {
+
+ private static final long serialVersionUID = 5677121412192984281L;
+
+ public ExecuteException() {
+ }
+
+ public ExecuteException(String message) {
+ super(message);
+ }
+
+ public ExecuteException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ExecuteException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java b/core-job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
new file mode 100644
index 0000000..f19b0ca
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class IllegalStateTranferException extends RuntimeException {
+
+ private static final long serialVersionUID = 8466551519300132702L;
+
+ public IllegalStateTranferException() {
+ }
+
+ public IllegalStateTranferException(String message) {
+ super(message);
+ }
+
+ public IllegalStateTranferException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public IllegalStateTranferException(Throwable cause) {
+ super(cause);
+ }
+
+ public IllegalStateTranferException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/exception/JobException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/JobException.java b/core-job/src/main/java/org/apache/kylin/job/exception/JobException.java
new file mode 100644
index 0000000..ba4c52a
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/JobException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ * @author xduo
+ *
+ */
+public class JobException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ *
+ */
+ public JobException() {
+ super();
+ }
+
+ /**
+ * @param message
+ * @param cause
+ */
+ public JobException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * @param message
+ */
+ public JobException(String message) {
+ super(message);
+ }
+
+ /**
+ * @param cause
+ */
+ public JobException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/exception/LockException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/LockException.java b/core-job/src/main/java/org/apache/kylin/job/exception/LockException.java
new file mode 100644
index 0000000..cf43ac9
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/LockException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class LockException extends Exception {
+ private static final long serialVersionUID = 2072745879281754945L;
+
+ public LockException() {
+ }
+
+ public LockException(String message) {
+ super(message);
+ }
+
+ public LockException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public LockException(Throwable cause) {
+ super(cause);
+ }
+
+ public LockException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/PersistentException.java b/core-job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
new file mode 100644
index 0000000..8507a53
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class PersistentException extends Exception {
+ private static final long serialVersionUID = -4239863858506718998L;
+
+ public PersistentException() {
+ }
+
+ public PersistentException(String message) {
+ super(message);
+ }
+
+ public PersistentException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PersistentException(Throwable cause) {
+ super(cause);
+ }
+
+ public PersistentException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java b/core-job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
new file mode 100644
index 0000000..057bd4a
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class SchedulerException extends Exception {
+ private static final long serialVersionUID = 349041244824274861L;
+
+ public SchedulerException() {
+ }
+
+ public SchedulerException(String message) {
+ super(message);
+ }
+
+ public SchedulerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SchedulerException(Throwable cause) {
+ super(cause);
+ }
+
+ public SchedulerException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}