You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:43 UTC

[41/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
new file mode 100644
index 0000000..932da38
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.LookupDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+/**
+ * @author George Song (ysong1)
+ * 
+ */
+
+public class JoinedFlatTable {
+
+    public static final String FACT_TABLE_ALIAS = "FACT_TABLE";
+
+    public static final String LOOKUP_TABLE_ALAIS_PREFIX = "LOOKUP_";
+
+    public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
+        return storageDfsDir + "/" + intermediateTableDesc.getTableName();
+    }
+
+    public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
+        StringBuilder ddl = new StringBuilder();
+
+        ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName() + "\n");
+
+        ddl.append("(" + "\n");
+        for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
+            IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
+            if (i > 0) {
+                ddl.append(",");
+            }
+            ddl.append(colName(col.getCanonicalName()) + " " + getHiveDataType(col.getDataType()) + "\n");
+        }
+        ddl.append(")" + "\n");
+
+        ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
+        ddl.append("STORED AS SEQUENCEFILE" + "\n");
+        ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n");
+        // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
+        // ";\n");
+        return ddl.toString();
+    }
+
+    public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+        StringBuilder ddl = new StringBuilder();
+        ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName() + ";").append("\n");
+        return ddl.toString();
+    }
+
+    public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) throws IOException {
+        StringBuilder sql = new StringBuilder();
+
+        File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath());
+
+        if (hadoopPropertiesFile.exists()) {
+            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+            DocumentBuilder builder;
+            Document doc;
+            try {
+                builder = factory.newDocumentBuilder();
+                doc = builder.parse(hadoopPropertiesFile);
+                NodeList nl = doc.getElementsByTagName("property");
+                for (int i = 0; i < nl.getLength(); i++) {
+                    String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
+                    String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
+                    if (name.equals("tmpjars") == false) {
+                        sql.append("SET " + name + "=" + value + ";").append("\n");
+                    }
+                }
+
+            } catch (ParserConfigurationException e) {
+                throw new IOException(e);
+            } catch (SAXException e) {
+                throw new IOException(e);
+            }
+        }
+
+        sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
+
+        return sql.toString();
+    }
+
+    public static String generateSelectDataStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+        StringBuilder sql = new StringBuilder();
+        sql.append("SELECT" + "\n");
+        String tableAlias;
+        Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel());
+        for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
+            IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
+            if (i > 0) {
+                sql.append(",");
+            }
+            tableAlias = tableAliasMap.get(col.getTableName());
+            sql.append(tableAlias + "." + col.getColumnName() + "\n");
+        }
+        appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
+        appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
+        return sql.toString();
+    }
+
+    private static Map<String, String> buildTableAliasMap(DataModelDesc dataModelDesc) {
+        Map<String, String> tableAliasMap = new HashMap<String, String>();
+
+        tableAliasMap.put(dataModelDesc.getFactTable().toUpperCase(), FACT_TABLE_ALIAS);
+
+        int i = 1;
+        for (LookupDesc lookupDesc: dataModelDesc.getLookups()) {
+            JoinDesc join = lookupDesc.getJoin();
+            if (join != null) {
+                tableAliasMap.put(lookupDesc.getTable().toUpperCase(), LOOKUP_TABLE_ALAIS_PREFIX + i);
+                i++;
+            }
+
+        }
+        return tableAliasMap;
+    }
+
+    private static void appendJoinStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+        Set<String> dimTableCache = new HashSet<String>();
+
+        DataModelDesc dataModelDesc = intermediateTableDesc.getDataModel();
+        String factTableName = dataModelDesc.getFactTable();
+        String factTableAlias = tableAliasMap.get(factTableName);
+        sql.append("FROM " + factTableName + " as " + factTableAlias + " \n");
+
+        for (LookupDesc lookupDesc : dataModelDesc.getLookups()) {
+            JoinDesc join = lookupDesc.getJoin();
+            if (join != null && join.getType().equals("") == false) {
+                String joinType = join.getType().toUpperCase();
+                String dimTableName = lookupDesc.getTable();
+                if (!dimTableCache.contains(dimTableName)) {
+                    TblColRef[] pk = join.getPrimaryKeyColumns();
+                    TblColRef[] fk = join.getForeignKeyColumns();
+                    if (pk.length != fk.length) {
+                        throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
+                    }
+                    sql.append(joinType + " JOIN " + dimTableName + " as " + tableAliasMap.get(dimTableName) + "\n");
+                    sql.append("ON ");
+                    for (int i = 0; i < pk.length; i++) {
+                        if (i > 0) {
+                            sql.append(" AND ");
+                        }
+                        sql.append(factTableAlias + "." + fk[i].getName() + " = " + tableAliasMap.get(dimTableName) + "." + pk[i].getName());
+                    }
+                    sql.append("\n");
+
+                    dimTableCache.add(dimTableName);
+                }
+            }
+        }
+    }
+
+    private static void appendWhereStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+        if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
+            return;//TODO: for now only cube segments support filter and partition
+        }
+        CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc;
+
+        boolean hasCondition = false;
+        StringBuilder whereBuilder = new StringBuilder();
+        whereBuilder.append("WHERE");
+
+        CubeDesc cubeDesc = desc.getCubeDesc();
+        DataModelDesc model = cubeDesc.getModel();
+        
+        if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) {
+            whereBuilder.append(" (").append(model.getFilterCondition()).append(") ");
+            hasCondition = true;
+        }
+
+        CubeSegment cubeSegment = desc.getCubeSegment();
+
+        if (null != cubeSegment) {
+            PartitionDesc partDesc = model.getPartitionDesc();
+            long dateStart = cubeSegment.getDateRangeStart();
+            long dateEnd = cubeSegment.getDateRangeEnd();
+
+            if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) {
+                whereBuilder.append(hasCondition ? " AND (" : " (");
+                whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, dateStart, dateEnd, tableAliasMap));
+                whereBuilder.append(")\n");
+                hasCondition = true;
+            }
+        }
+
+        if (hasCondition) {
+            sql.append(whereBuilder.toString());
+        }
+    }
+
+    private static String colName(String canonicalColName) {
+        return canonicalColName.replace(".", "_");
+    }
+    
+    private static String getHiveDataType(String javaDataType) {
+        String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType;
+        hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType;
+
+        return hiveDataType.toLowerCase();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
new file mode 100644
index 0000000..2ed2fc2
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.SchedulerException;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.lock.JobLock;
+
+/**
+ */
+public interface Scheduler<T extends Executable> {
+
+    void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException;
+
+    void shutdown() throws SchedulerException;
+
+    boolean stop(T executable) throws SchedulerException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
new file mode 100644
index 0000000..29b5324
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+/**
+ */
+public abstract class BaseCommandOutput implements ICommandOutput {
+
+    @Override
+    public void log(String message) {
+        this.appendOutput(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
new file mode 100644
index 0000000..6cab6a3
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.apache.kylin.common.util.Logger;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+
+/**
+ * @author xjiang
+ * 
+ */
+public interface ICommandOutput extends Logger {
+
+    public void setStatus(JobStepStatusEnum status);
+
+    public JobStepStatusEnum getStatus();
+
+    public void appendOutput(String message);
+
+    public String getOutput();
+
+    public void setExitCode(int exitCode);
+
+    public int getExitCode();
+
+    public void reset();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java b/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
new file mode 100644
index 0000000..5a47173
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.apache.kylin.job.exception.JobException;
+
+/**
+ * @author xjiang
+ * 
+ */
+public interface IJobCommand {
+
+    public ICommandOutput execute() throws JobException;
+
+    public void cancel() throws JobException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
new file mode 100644
index 0000000..6a718fc
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.*;
+
+/**
+ * @author xjiang
+ * 
+ */
+public class ShellCmd implements IJobCommand {
+
+    private static Logger log = LoggerFactory.getLogger(ShellCmd.class);
+
+    private final String executeCommand;
+    private final ICommandOutput output;
+    private final boolean isAsync;
+    private final CliCommandExecutor cliCommandExecutor;
+
+    private FutureTask<Integer> future;
+
+    private ShellCmd(String executeCmd, ICommandOutput out, String host, int port, String user, String password, boolean async) {
+        this.executeCommand = executeCmd;
+        this.output = out;
+        this.cliCommandExecutor = new CliCommandExecutor();
+        this.cliCommandExecutor.setRunAtRemote(host, port, user, password);
+        this.isAsync = async;
+    }
+
+    public ShellCmd(String executeCmd, String host, int port, String user, String password, boolean async) {
+        this(executeCmd, new ShellCmdOutput(), host, port, user, password, async);
+    }
+
+    @Override
+    public ICommandOutput execute() throws JobException {
+
+        final ExecutorService executor = Executors.newSingleThreadExecutor();
+        future = new FutureTask<Integer>(new Callable<Integer>() {
+            public Integer call() throws JobException, IOException {
+                executor.shutdown();
+                return executeCommand(executeCommand);
+            }
+        });
+        executor.execute(future);
+
+        int exitCode = -1;
+        if (!isAsync) {
+            try {
+                exitCode = future.get();
+                log.info("finish executing");
+            } catch (CancellationException e) {
+                log.debug("Command is cancelled");
+                exitCode = -2;
+            } catch (Exception e) {
+                throw new JobException("Error when execute job " + executeCommand, e);
+            } finally {
+                if (exitCode == 0) {
+                    output.setStatus(JobStepStatusEnum.FINISHED);
+                } else if (exitCode == -2) {
+                    output.setStatus(JobStepStatusEnum.DISCARDED);
+                } else {
+                    output.setStatus(JobStepStatusEnum.ERROR);
+                }
+                output.setExitCode(exitCode);
+            }
+        }
+        return output;
+    }
+
+    protected int executeCommand(String command) throws JobException, IOException {
+        output.reset();
+        output.setStatus(JobStepStatusEnum.RUNNING);
+        return cliCommandExecutor.execute(command, output).getFirst();
+    }
+
+    @Override
+    public void cancel() throws JobException {
+        future.cancel(true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
new file mode 100644
index 0000000..ebcad47
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+
+/**
+ * @author xjiang
+ * 
+ */
+public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput {
+
+    protected static final Logger log = LoggerFactory.getLogger(ShellCmdOutput.class);
+
+    protected StringBuilder output;
+    protected int exitCode;
+    protected JobStepStatusEnum status;
+
+    public ShellCmdOutput() {
+        init();
+    }
+
+    private void init() {
+        output = new StringBuilder();
+        exitCode = -1;
+        status = JobStepStatusEnum.NEW;
+    }
+
+    @Override
+    public JobStepStatusEnum getStatus() {
+        return status;
+    }
+
+    @Override
+    public void setStatus(JobStepStatusEnum s) {
+        this.status = s;
+    }
+
+    @Override
+    public String getOutput() {
+        return output.toString();
+    }
+
+    @Override
+    public void appendOutput(String message) {
+        output.append(message).append(System.getProperty("line.separator"));
+        log.debug(message);
+    }
+
+    @Override
+    public int getExitCode() {
+        return exitCode;
+    }
+
+    @Override
+    public void setExitCode(int code) {
+        exitCode = code;
+    }
+
+    @Override
+    public void reset() {
+        init();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java b/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
new file mode 100644
index 0000000..1cda348
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.common;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+/**
+ */
+public class OptionsHelper {
+    private CommandLine commandLine;
+
+    public void parseOptions(Options options, String[] args) throws ParseException {
+        CommandLineParser parser = new GnuParser();
+        commandLine = parser.parse(options, args);
+    }
+
+    public Option[] getOptions() {
+        return commandLine.getOptions();
+    }
+
+    public String getOptionsAsString() {
+        StringBuilder buf = new StringBuilder();
+        for (Option option : commandLine.getOptions()) {
+            buf.append(" ");
+            buf.append(option.getOpt());
+            if (option.hasArg()) {
+                buf.append("=");
+                buf.append(option.getValue());
+            }
+        }
+        return buf.toString();
+    }
+
+    public String getOptionValue(Option option) {
+        return commandLine.getOptionValue(option.getOpt());
+    }
+
+    public boolean hasOption(Option option) {
+        return commandLine.hasOption(option.getOpt());
+    }
+
+    public void printUsage(String programName, Options options) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(programName, options);
+    }
+
+    public static String convertToFileURL(String path) {
+        if (File.separatorChar != '/') {
+            path = path.replace(File.separatorChar, '/');
+        }
+
+        return path;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
new file mode 100644
index 0000000..786698e
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.common;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.kylin.common.util.Pair;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.util.Logger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+/**
+ */
+public class ShellExecutable extends AbstractExecutable {
+
+    private static final String CMD = "cmd";
+
+    public ShellExecutable() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        try {
+            logger.info("executing:" + getCmd());
+            final ShellExecutableLogger logger = new ShellExecutableLogger();
+            final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger);
+            executableManager.addJobInfo(getId(), logger.getInfo());
+            return new ExecuteResult(result.getFirst() == 0? ExecuteResult.State.SUCCEED: ExecuteResult.State.FAILED, result.getSecond());
+        } catch (IOException e) {
+            logger.error("job:" + getId() + " execute finished with exception", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+
+    public void setCmd(String cmd) {
+        setParam(CMD, cmd);
+    }
+
+    public String getCmd() {
+        return getParam(CMD);
+    }
+
+    private static class ShellExecutableLogger implements Logger {
+
+        private final Map<String, String> info = Maps.newHashMap();
+
+        private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
+        private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
+        private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
+        private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
+        private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
+        private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
+
+        // hive
+        private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
+        private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
+
+        @Override
+        public void log(String message) {
+            Matcher matcher = PATTERN_APP_ID.matcher(message);
+            if (matcher.find()) {
+                String appId = matcher.group(1);
+                info.put(ExecutableConstants.YARN_APP_ID, appId);
+            }
+
+            matcher = PATTERN_APP_URL.matcher(message);
+            if (matcher.find()) {
+                String appTrackingUrl = matcher.group(1);
+                info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
+            }
+
+            matcher = PATTERN_JOB_ID.matcher(message);
+            if (matcher.find()) {
+                String mrJobID = matcher.group(1);
+                info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
+            }
+
+            matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
+            if (matcher.find()) {
+                String hdfsWritten = matcher.group(1);
+                info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
+            }
+
+            matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
+            if (matcher.find()) {
+                String sourceCount = matcher.group(1);
+                info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
+            }
+
+            matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
+            if (matcher.find()) {
+                String sourceSize = matcher.group(1);
+                info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
+            }
+
+            // hive
+            matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
+            if (matcher.find()) {
+                String jobId = matcher.group(1);
+                String trackingUrl = matcher.group(2);
+                info.put(ExecutableConstants.MR_JOB_ID, jobId);
+                info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
+            }
+
+            matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
+            if (matcher.find()) {
+                // String hdfsRead = matcher.group(1);
+                String hdfsWritten = matcher.group(2);
+                info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
+            }
+        }
+
+        Map<String, String> getInfo() {
+            return info;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
new file mode 100644
index 0000000..fdcfdbe
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.constant;
+
+/**
+ */
+public final class ExecutableConstants {
+
+    private ExecutableConstants(){}
+
+    public static final String YARN_APP_ID = "yarn_application_id";
+
+    public static final String YARN_APP_URL = "yarn_application_tracking_url";
+    public static final String MR_JOB_ID = "mr_job_id";
+    public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
+    public static final String SOURCE_RECORDS_COUNT = "source_records_count";
+    public static final String SOURCE_RECORDS_SIZE = "source_records_size";
+    public static final String GLOBAL_LISTENER_NAME = "ChainListener";
+
+
+
+
+    public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60;
+
+    public static final String CUBE_JOB_GROUP_NAME = "cube_job_group";
+
+    public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group";
+    public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
+
+    public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
+    public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
+    public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
+    public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube";
+    public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid Data";
+    public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
+    public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
+    public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
+    public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
+    public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
+    public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
+    public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
+    public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
+    public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
+    public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
+    
+    public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
+    public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
+
+    public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
+    public static final String PROP_JOB_FLOW = "jobFlow";
+    public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid";
+    public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID";
+    public static final String PROP_COMMAND = "command";
+    // public static final String PROP_STORAGE_LOCATION =
+    // "storageLocationIdentifier";
+    public static final String PROP_JOB_ASYNC = "jobAsync";
+    public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor";
+    public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput";
+    public static final String PROP_JOB_KILLED = "jobKilled";
+    public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows";
+
+    public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java b/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
new file mode 100644
index 0000000..a4ef564
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.constant;
+
+public enum JobStatusEnum {
+
+    NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16);
+
+    private final int code;
+
+    private JobStatusEnum(int statusCode) {
+        this.code = statusCode;
+    }
+
+    public static JobStatusEnum getByCode(int statusCode) {
+        for (JobStatusEnum status : values()) {
+            if (status.getCode() == statusCode) {
+                return status;
+            }
+        }
+
+        return null;
+    }
+
+    public int getCode() {
+        return this.code;
+    }
+
+    public boolean isComplete() {
+        return code == JobStatusEnum.FINISHED.getCode() || code == JobStatusEnum.ERROR.getCode() || code == JobStatusEnum.DISCARDED.getCode();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java b/core-job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
new file mode 100644
index 0000000..02b40a3
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.constant;
+
+/**
+ * @author xduo, ysong1
+ * 
+ */
+public enum JobStepCmdTypeEnum {
+    SHELL_CMD, SHELL_CMD_HADOOP, JAVA_CMD_HADOOP_FACTDISTINCT, JAVA_CMD_HADOOP_BASECUBOID, JAVA_CMD_HADOOP_NDCUBOID, JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION, JAVA_CMD_HADOOP_CONVERTHFILE, JAVA_CMD_HADOOP_MERGECUBOID, JAVA_CMD_HADOOP_NO_MR_DICTIONARY, JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE, JAVA_CMD_HADOOP_NO_MR_BULKLOAD
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java b/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
new file mode 100644
index 0000000..08ee79a
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.constant;
+
+public enum JobStepStatusEnum {
+    NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), WAITING(32), KILLED(64);
+
+    private final int code;
+
+    private JobStepStatusEnum(int statusCode) {
+        this.code = statusCode;
+    }
+
+    public static JobStepStatusEnum getByCode(int statusCode) {
+        for (JobStepStatusEnum status : values()) {
+            if (status.getCode() == statusCode) {
+                return status;
+            }
+        }
+
+        return null;
+    }
+
+    public int getCode() {
+        return this.code;
+    }
+
+    public boolean isComplete() {
+        return code == JobStepStatusEnum.FINISHED.getCode() || code == JobStepStatusEnum.ERROR.getCode() || code == JobStepStatusEnum.DISCARDED.getCode();
+    }
+
+    public boolean isRunable() {
+        return code == JobStepStatusEnum.PENDING.getCode() || code == JobStepStatusEnum.ERROR.getCode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
new file mode 100644
index 0000000..4862bb1
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.dao;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.metadata.MetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ */
+public class ExecutableDao {
+
+    private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class);
+    private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
+    private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
+    private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
+    public static final String JOB_PATH_ROOT = "/execute";
+    public static final String JOB_OUTPUT_ROOT = "/execute_output";
+
+    private ResourceStore store;
+
+    public static ExecutableDao getInstance(KylinConfig config) {
+        ExecutableDao r = CACHE.get(config);
+        if (r == null) {
+            r = new ExecutableDao(config);
+            CACHE.put(config, r);
+            if (CACHE.size() > 1) {
+                logger.warn("More than one singleton exist");
+            }
+
+        }
+        return r;
+    }
+
+    private ExecutableDao(KylinConfig config) {
+        logger.info("Using metadata url: " + config);
+        this.store = MetadataManager.getInstance(config).getStore();
+    }
+
+    private String pathOfJob(ExecutablePO job) {
+        return pathOfJob(job.getUuid());
+    }
+    private String pathOfJob(String uuid) {
+        return JOB_PATH_ROOT + "/" + uuid;
+    }
+
+    private String pathOfJobOutput(String uuid) {
+        return JOB_OUTPUT_ROOT + "/" + uuid;
+    }
+
+    private ExecutablePO readJobResource(String path) throws IOException {
+        return store.getResource(path, ExecutablePO.class, JOB_SERIALIZER);
+    }
+
+    private void writeJobResource(String path, ExecutablePO job) throws IOException {
+        store.putResource(path, job, JOB_SERIALIZER);
+    }
+
+    private ExecutableOutputPO readJobOutputResource(String path) throws IOException {
+        return store.getResource(path, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
+    }
+
+    private long writeJobOutputResource(String path, ExecutableOutputPO output) throws IOException {
+        return store.putResource(path, output, JOB_OUTPUT_SERIALIZER);
+    }
+
+    public List<ExecutableOutputPO> getJobOutputs() throws PersistentException {
+        try {
+            ArrayList<String> resources = store.listResources(JOB_OUTPUT_ROOT);
+            if (resources == null || resources.isEmpty()) {
+                return Collections.emptyList();
+            }
+            Collections.sort(resources);
+            String rangeStart = resources.get(0);
+            String rangeEnd = resources.get(resources.size() - 1);
+            return store.getAllResources(rangeStart, rangeEnd, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
+        } catch (IOException e) {
+            logger.error("error get all Jobs:", e);
+            throw new PersistentException(e);
+        }
+    }
+
+    public List<ExecutablePO> getJobs() throws PersistentException {
+        try {
+            final List<String> jobIds = store.listResources(JOB_PATH_ROOT);
+            if (jobIds == null || jobIds.isEmpty()) {
+                return Collections.emptyList();
+            }
+            Collections.sort(jobIds);
+            String rangeStart = jobIds.get(0);
+            String rangeEnd = jobIds.get(jobIds.size() - 1);
+            return store.getAllResources(rangeStart, rangeEnd, ExecutablePO.class, JOB_SERIALIZER);
+        } catch (IOException e) {
+            logger.error("error get all Jobs:", e);
+            throw new PersistentException(e);
+        }
+    }
+
+    public List<String> getJobIds() throws PersistentException {
+        try {
+            ArrayList<String> resources = store.listResources(JOB_PATH_ROOT);
+            if (resources == null) {
+                return Collections.emptyList();
+            }
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(resources.size());
+            for (String path : resources) {
+                result.add(path.substring(path.lastIndexOf("/") + 1));
+            }
+            return result;
+        } catch (IOException e) {
+            logger.error("error get all Jobs:", e);
+            throw new PersistentException(e);
+        }
+    }
+
+    public ExecutablePO getJob(String uuid) throws PersistentException {
+        try {
+            return readJobResource(pathOfJob(uuid));
+        } catch (IOException e) {
+            logger.error("error get job:" + uuid, e);
+            throw new PersistentException(e);
+        }
+    }
+
+    public ExecutablePO addJob(ExecutablePO job) throws PersistentException {
+        try {
+            if (getJob(job.getUuid()) != null) {
+                throw new IllegalArgumentException("job id:" + job.getUuid() + " already exists");
+            }
+            writeJobResource(pathOfJob(job), job);
+            return job;
+        } catch (IOException e) {
+            logger.error("error save job:" + job.getUuid(), e);
+            throw new PersistentException(e);
+        }
+    }
+
+    public void deleteJob(String uuid) throws PersistentException {
+        try {
+            store.deleteResource(pathOfJob(uuid));
+        } catch (IOException e) {
+            logger.error("error delete job:" + uuid, e);
+            throw new PersistentException(e);
+        }
+    }
+
+    public ExecutableOutputPO getJobOutput(String uuid) throws PersistentException {
+        try {
+            ExecutableOutputPO result = readJobOutputResource(pathOfJobOutput(uuid));
+            if (result == null) {
+                result = new ExecutableOutputPO();
+                result.setUuid(uuid);
+                return result;
+            }
+            return result;
+        } catch (IOException e) {
+            logger.error("error get job output id:" + uuid, e);
+            throw new PersistentException(e);
+        }
+    }
+
+    public void addJobOutput(ExecutableOutputPO output) throws PersistentException {
+        try {
+            output.setLastModified(0);
+            writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
+        } catch (IOException e) {
+            logger.error("error update job output id:" + output.getUuid(), e);
+            throw new PersistentException(e);
+        }
+    }
+
+    public void updateJobOutput(ExecutableOutputPO output) throws PersistentException {
+        try {
+            final long ts = writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
+            output.setLastModified(ts);
+        } catch (IOException e) {
+            logger.error("error update job output id:" + output.getUuid(), e);
+            throw new PersistentException(e);
+        }
+    }
+
+    public void deleteJobOutput(String uuid) throws PersistentException {
+        try {
+            store.deleteResource(pathOfJobOutput(uuid));
+        } catch (IOException e) {
+            logger.error("error delete job:" + uuid, e);
+            throw new PersistentException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
new file mode 100644
index 0000000..4dacd8a
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.dao;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class ExecutableOutputPO extends RootPersistentEntity {
+
+    @JsonProperty("content")
+    private String content;
+
+    @JsonProperty("status")
+    private String status = "READY";
+
+    @JsonProperty("info")
+    private Map<String, String> info = Maps.newHashMap();
+
+    public String getContent() {
+        return content;
+    }
+
+    public void setContent(String content) {
+        this.content = content;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
+    public Map<String, String> getInfo() {
+        return info;
+    }
+
+    public void setInfo(Map<String, String> info) {
+        this.info = info;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
new file mode 100644
index 0000000..6a17b29
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.dao;
+
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class ExecutablePO extends RootPersistentEntity {
+
+    @JsonProperty("name")
+    private String name;
+
+    @JsonProperty("tasks")
+    private List<ExecutablePO> tasks;
+
+    @JsonProperty("type")
+    private String type;
+
+    @JsonProperty("params")
+    private Map<String, String> params = Maps.newHashMap();
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public List<ExecutablePO> getTasks() {
+        return tasks;
+    }
+
+    public void setTasks(List<ExecutablePO> tasks) {
+        this.tasks = tasks;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public Map<String, String> getParams() {
+        return params;
+    }
+
+    public void setParams(Map<String, String> params) {
+        this.params = params;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
new file mode 100644
index 0000000..4d4d944
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.engine;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.common.OptionsHelper;
+import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author ysong1
+ */
+public class JobEngineConfig {
+    private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class);
+    public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
+    public static String HIVE_CONF_FILENAME = "kylin_hive_conf";
+
+    private static File getJobConfig(String fileName) {
+        String path = System.getProperty(KylinConfig.KYLIN_CONF);
+        if (StringUtils.isNotEmpty(path)) {
+            return new File(path, fileName);
+        }
+
+        path = KylinConfig.getKylinHome();
+        if (StringUtils.isNotEmpty(path)) {
+            return new File(path + File.separator + "conf", fileName);
+        }
+        return null;
+    }
+
+    private String getHadoopJobConfFilePath(RealizationCapacity capaticy, boolean appendSuffix) throws IOException {
+        String hadoopJobConfFile;
+        if (appendSuffix) {
+            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml");
+        } else {
+            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
+        }
+
+        File jobConfig = getJobConfig(hadoopJobConfFile);
+        if (jobConfig == null || !jobConfig.exists()) {
+            logger.warn("fail to locate " + hadoopJobConfFile + ", trying to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
+            jobConfig = getJobConfig(HADOOP_JOB_CONF_FILENAME + ".xml");
+            if (jobConfig == null || !jobConfig.exists()) {
+                logger.error("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
+                throw new RuntimeException("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
+            }
+        }
+        return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
+    }
+
+    public String getHadoopJobConfFilePath(RealizationCapacity capaticy) throws IOException {
+        String path = getHadoopJobConfFilePath(capaticy, true);
+        if (!StringUtils.isEmpty(path)) {
+            logger.info("Chosen job conf is : " + path);
+            return path;
+        } else {
+            path = getHadoopJobConfFilePath(capaticy, false);
+            if (!StringUtils.isEmpty(path)) {
+                logger.info("Chosen job conf is : " + path);
+                return path;
+            }
+        }
+        return "";
+    }
+
+
+    public String getHiveConfFilePath() throws IOException {
+        String hiveConfFile = (HIVE_CONF_FILENAME + ".xml");
+
+        File jobConfig = getJobConfig(hiveConfFile);
+        if (jobConfig == null || !jobConfig.exists()) {
+
+            logger.error("fail to locate " + HIVE_CONF_FILENAME + ".xml");
+            throw new RuntimeException("fail to locate " + HIVE_CONF_FILENAME + ".xml");
+        }
+        return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
+    }
+
+    // there should be no setters
+    private final KylinConfig config;
+
+    public JobEngineConfig(KylinConfig config) {
+        this.config = config;
+    }
+
+    public KylinConfig getConfig() {
+        return config;
+    }
+
+    public String getHdfsWorkingDirectory() {
+        return config.getHdfsWorkingDirectory();
+    }
+    
+    /**
+     * @return the maxConcurrentJobLimit
+     */
+    public int getMaxConcurrentJobLimit() {
+        return config.getMaxConcurrentJobLimit();
+    }
+
+    /**
+     * @return the timeZone
+     */
+    public String getTimeZone() {
+        return config.getTimeZone();
+    }
+
+    /**
+     * @return the adminDls
+     */
+    public String getAdminDls() {
+        return config.getAdminDls();
+    }
+
+    /**
+     * @return the jobStepTimeout
+     */
+    public long getJobStepTimeout() {
+        return config.getJobStepTimeout();
+    }
+
+    /**
+     * @return the asyncJobCheckInterval
+     */
+    public int getAsyncJobCheckInterval() {
+        return config.getYarnStatusCheckIntervalSeconds();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((config == null) ? 0 : config.hashCode());
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        JobEngineConfig other = (JobEngineConfig) obj;
+        if (config == null) {
+            if (other.config != null)
+                return false;
+        } else if (!config.equals(other.config))
+            return false;
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java b/core-job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
new file mode 100644
index 0000000..8544fff
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class ExecuteException extends Exception {
+
+    private static final long serialVersionUID = 5677121412192984281L;
+
+    public ExecuteException() {
+    }
+
+    public ExecuteException(String message) {
+        super(message);
+    }
+
+    public ExecuteException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ExecuteException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java b/core-job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
new file mode 100644
index 0000000..f19b0ca
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class IllegalStateTranferException extends RuntimeException {
+
+    private static final long serialVersionUID = 8466551519300132702L;
+
+    public IllegalStateTranferException() {
+    }
+
+    public IllegalStateTranferException(String message) {
+        super(message);
+    }
+
+    public IllegalStateTranferException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public IllegalStateTranferException(Throwable cause) {
+        super(cause);
+    }
+
+    public IllegalStateTranferException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/exception/JobException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/JobException.java b/core-job/src/main/java/org/apache/kylin/job/exception/JobException.java
new file mode 100644
index 0000000..ba4c52a
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/JobException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ * @author xduo
+ * 
+ */
+public class JobException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 
+     */
+    public JobException() {
+        super();
+    }
+
+    /**
+     * @param message
+     * @param cause
+     */
+    public JobException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * @param message
+     */
+    public JobException(String message) {
+        super(message);
+    }
+
+    /**
+     * @param cause
+     */
+    public JobException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/exception/LockException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/LockException.java b/core-job/src/main/java/org/apache/kylin/job/exception/LockException.java
new file mode 100644
index 0000000..cf43ac9
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/LockException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class LockException extends Exception {
+    private static final long serialVersionUID = 2072745879281754945L;
+
+    public LockException() {
+    }
+
+    public LockException(String message) {
+        super(message);
+    }
+
+    public LockException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public LockException(Throwable cause) {
+        super(cause);
+    }
+
+    public LockException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/PersistentException.java b/core-job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
new file mode 100644
index 0000000..8507a53
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class PersistentException extends Exception {
+    private static final long serialVersionUID = -4239863858506718998L;
+
+    public PersistentException() {
+    }
+
+    public PersistentException(String message) {
+        super(message);
+    }
+
+    public PersistentException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public PersistentException(Throwable cause) {
+        super(cause);
+    }
+
+    public PersistentException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java b/core-job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
new file mode 100644
index 0000000..057bd4a
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class SchedulerException extends Exception {
+    private static final long serialVersionUID = 349041244824274861L;
+
+    public SchedulerException() {
+    }
+
+    public SchedulerException(String message) {
+        super(message);
+    }
+
+    public SchedulerException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SchedulerException(Throwable cause) {
+        super(cause);
+    }
+
+    public SchedulerException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}