You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/10/27 14:05:53 UTC

[3/6] kylin git commit: APACHE-KYLIN-2723: collect job related metrics

APACHE-KYLIN-2723: collect job related metrics

Signed-off-by: lidongsjtu <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ff0d0edc
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ff0d0edc
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ff0d0edc

Branch: refs/heads/master
Commit: ff0d0edc4f71a112dac347656a6c5bdc786975a9
Parents: 4e3b515
Author: Zhong <nj...@apache.org>
Authored: Thu Aug 10 17:47:32 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Fri Oct 27 21:58:08 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  9 ++
 .../kylin/job/common/ShellExecutable.java       |  7 +-
 .../kylin/job/exception/ShellException.java     | 43 +++++++++
 .../job/execution/DefaultChainedExecutable.java | 18 ++--
 .../kylin/job/execution/ExecuteResult.java      | 30 ++++++
 .../kylin/metadata/project/ProjectInstance.java |  7 ++
 .../metrics/job/JobRecordEventWrapper.java      | 23 +----
 engine-mr/pom.xml                               |  4 +
 .../org/apache/kylin/engine/mr/CubingJob.java   | 96 +++++++++++++++++++-
 .../engine/mr/common/HadoopShellExecutable.java |  8 +-
 .../engine/mr/common/MapReduceExecutable.java   |  9 +-
 .../mr/exception/HadoopShellException.java      | 44 +++++++++
 .../engine/mr/exception/MapReduceException.java | 43 +++++++++
 .../mr/exception/SegmentNotFoundException.java  | 44 +++++++++
 .../engine/mr/steps/MergeDictionaryStep.java    |  2 +-
 .../engine/mr/steps/MergeStatisticsStep.java    |  2 +-
 .../engine/mr/steps/SaveStatisticsStep.java     |  2 +-
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  |  2 +-
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java  |  9 +-
 .../kylin/engine/spark/SparkExecutable.java     |  2 +-
 .../source/hive/CreateFlatHiveTableStep.java    |  2 +-
 .../apache/kylin/source/hive/HiveMRInput.java   |  4 +-
 .../hive/exception/SegmentEmptyException.java   | 44 +++++++++
 .../org/apache/kylin/source/jdbc/CmdStep.java   |  2 +-
 .../apache/kylin/source/jdbc/HiveCmdStep.java   |  2 +-
 .../apache/kylin/source/kafka/KafkaMRInput.java |  2 +-
 .../kylin/source/kafka/job/MergeOffsetStep.java |  2 +-
 .../storage/hbase/steps/DeprecatedGCStep.java   |  2 +-
 .../steps/HDFSPathGarbageCollectionStep.java    |  2 +-
 .../kylin/storage/hbase/steps/MergeGCStep.java  |  2 +-
 30 files changed, 413 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index c988b9f..3a9a05e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1350,6 +1350,15 @@ abstract public class KylinConfigBase implements Serializable {
         return getDeployEnv();
     }
 
+    public String getKylinMetricsSubjectJob() {
+        return getOptional("kylin.core.metrics.subject-job", "METRICS_JOB") + "_" + getKylinMetricsSubjectSuffix();
+    }
+
+    public String getKylinMetricsSubjectJobException() {
+        return getOptional("kylin.core.metrics.subject-job-exception", "METRICS_JOB_EXCEPTION") + "_"
+                + getKylinMetricsSubjectSuffix();
+    }
+
     public String getKylinMetricsSubjectQuery() {
         return getOptional("kylin.core.metrics.subject-query", "METRICS_QUERY") + "_" + getKylinMetricsSubjectSuffix();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
index 9f431b0..070ac23 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
@@ -19,8 +19,10 @@
 package org.apache.kylin.job.common;
 
 import java.io.IOException;
+
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.ShellException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
@@ -45,10 +47,11 @@ public class ShellExecutable extends AbstractExecutable {
             final PatternedLogger patternedLogger = new PatternedLogger(logger);
             final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), patternedLogger);
             getManager().addJobInfo(getId(), patternedLogger.getInfo());
-            return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond());
+            return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED,
+                    new ShellException(result.getSecond()));
         } catch (IOException e) {
             logger.error("job:" + getId() + " execute finished with exception", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-job/src/main/java/org/apache/kylin/job/exception/ShellException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/ShellException.java b/core-job/src/main/java/org/apache/kylin/job/exception/ShellException.java
new file mode 100644
index 0000000..443bd9b
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/ShellException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class ShellException extends Exception {
+
+    public ShellException() {
+    }
+
+    public ShellException(String message) {
+        super(message);
+    }
+
+    public ShellException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ShellException(Throwable cause) {
+        super(cause);
+    }
+
+    public ShellException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index cbd49ae..77d5efa 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -64,7 +64,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
                 return subTask.execute(context);
             }
         }
-        return new ExecuteResult(ExecuteResult.State.SUCCEED, null);
+        return new ExecuteResult(ExecuteResult.State.SUCCEED);
     }
 
     @Override
@@ -82,7 +82,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
     @Override
     protected void onExecuteError(Throwable exception, ExecutableContext executableContext) {
         super.onExecuteError(exception, executableContext);
-        notifyUserStatusChange(executableContext, ExecutableState.ERROR);
+        onStatusChange(executableContext, new ExecuteResult(exception), ExecutableState.ERROR);
     }
 
     @Override
@@ -91,10 +91,10 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
         
         if (isDiscarded()) {
             setEndTime(System.currentTimeMillis());
-            notifyUserStatusChange(executableContext, ExecutableState.DISCARDED);
+            onStatusChange(executableContext, result, ExecutableState.DISCARDED);
         } else if (isPaused()) {
             setEndTime(System.currentTimeMillis());
-            notifyUserStatusChange(executableContext, ExecutableState.STOPPED);
+            onStatusChange(executableContext, result, ExecutableState.STOPPED);
         } else if (result.succeed()) {
             List<? extends Executable> jobs = getTasks();
             boolean allSucceed = true;
@@ -119,11 +119,11 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
             if (allSucceed) {
                 setEndTime(System.currentTimeMillis());
                 mgr.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null);
-                notifyUserStatusChange(executableContext, ExecutableState.SUCCEED);
+                onStatusChange(executableContext, result, ExecutableState.SUCCEED);
             } else if (hasError) {
                 setEndTime(System.currentTimeMillis());
                 mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
-                notifyUserStatusChange(executableContext, ExecutableState.ERROR);
+                onStatusChange(executableContext, result, ExecutableState.ERROR);
             } else if (hasRunning) {
                 mgr.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
             } else if (hasDiscarded) {
@@ -135,10 +135,14 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
         } else {
             setEndTime(System.currentTimeMillis());
             mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
-            notifyUserStatusChange(executableContext, ExecutableState.ERROR);
+            onStatusChange(executableContext, result, ExecutableState.ERROR);
         }
     }
 
+    protected void onStatusChange(ExecutableContext context, ExecuteResult result, ExecutableState state) {
+        notifyUserStatusChange(context, state);
+    }
+
     @Override
     public List<AbstractExecutable> getTasks() {
         return subTasks;

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
index 760a574..139c04b 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
@@ -30,6 +30,14 @@ public final class ExecuteResult {
 
     private final State state;
     private final String output;
+    private final Throwable throwable;
+
+    /**
+     * Default constructor to indicate a success ExecuteResult.
+     */
+    public ExecuteResult() {
+        this(State.SUCCEED, "succeed");
+    }
 
     public ExecuteResult(State state) {
         this(state, "");
@@ -39,6 +47,24 @@ public final class ExecuteResult {
         Preconditions.checkArgument(state != null, "state cannot be null");
         this.state = state;
         this.output = output;
+        this.throwable = null;
+    }
+
+    public ExecuteResult(State state, Throwable throwable) {
+        Preconditions.checkArgument(state != null, "state cannot be null");
+        this.state = state;
+        this.throwable = throwable;
+        this.output = throwable.getMessage();
+    }
+
+    public ExecuteResult(Throwable throwable) {
+        this(throwable, throwable.getMessage());
+    }
+
+    public ExecuteResult(Throwable throwable, String output) {
+        this.state = State.ERROR;
+        this.throwable = throwable;
+        this.output = output;
     }
 
     public State state() {
@@ -52,4 +78,8 @@ public final class ExecuteResult {
     public String output() {
         return output;
     }
+
+    public Throwable getThrowable() {
+        return throwable;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 5f0e6a3..1f54416 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -92,6 +92,13 @@ public class ProjectInstance extends RootPersistentEntity {
         return ResourceStore.PROJECT_RESOURCE_ROOT + "/" + projectName + ".json";
     }
 
+    public static String getNormalizedProjectName(String project) {
+        if (project == null)
+            throw new IllegalStateException("Trying to normalized a project name which is null");
+
+        return project.toUpperCase();
+    }
+
     public static ProjectInstance create(String name, String owner, String description, LinkedHashMap<String, String> overrideProps, List<RealizationEntry> realizationEntries, List<String> models) {
         ProjectInstance projectInstance = new ProjectInstance();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
index 537388c..6cd197e 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
@@ -39,7 +39,7 @@ public class JobRecordEventWrapper extends RecordEventWrapper {
         this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), 0L);
         this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), 0L);
         this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), 0L);
-        setDependentStats();
+        this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 0L);
     }
 
     public void setWrapper(String projectName, String cubeName, String jobId, String jobType, String cubingType) {
@@ -50,12 +50,13 @@ public class JobRecordEventWrapper extends RecordEventWrapper {
         this.metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), cubingType);
     }
 
-    public void setStats(long tableSize, long cubeSize, long buildDuration, long waitResourceTime) {
+    public void setStats(long tableSize, long cubeSize, long buildDuration, long waitResourceTime,
+            double perBytesTimeCost) {
         this.metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), tableSize);
         this.metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), cubeSize);
         this.metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), buildDuration);
         this.metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), waitResourceTime);
-        setDependentStats();
+        this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), perBytesTimeCost);
     }
 
     public void setStepStats(long dColumnDistinct, long dDictBuilding, long dCubingInmem, long dHfileConvert) {
@@ -64,20 +65,4 @@ public class JobRecordEventWrapper extends RecordEventWrapper {
         this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), dCubingInmem);
         this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), dHfileConvert);
     }
-
-    private void setDependentStats() {
-        Long sourceSize = (Long) this.metricsEvent.get(JobPropertyEnum.SOURCE_SIZE.toString());
-        if (sourceSize != null && sourceSize != 0) {
-            if (sourceSize < MIN_SOURCE_SIZE) {
-                sourceSize = MIN_SOURCE_SIZE;
-            }
-            this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(),
-                    ((Long) this.metricsEvent.get(JobPropertyEnum.BUILD_DURATION.toString())
-                            - (Long) this.metricsEvent.get(JobPropertyEnum.WAIT_RESOURCE_TIME.toString())) * 1.0
-                            / sourceSize);
-        } else {
-            this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 0.0);
-        }
-    }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/pom.xml
----------------------------------------------------------------------
diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index 1ddc2bd..52d3d86 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -51,6 +51,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-storage</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+        </dependency>
 
         <!-- Env & Test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 466f706..c4b6e12 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -48,9 +48,15 @@ import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metrics.MetricsManager;
+import org.apache.kylin.metrics.job.ExceptionRecordEventWrapper;
+import org.apache.kylin.metrics.job.JobRecordEventWrapper;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Strings;
+
 /**
  */
 public class CubingJob extends DefaultChainedExecutable {
@@ -61,6 +67,35 @@ public class CubingJob extends DefaultChainedExecutable {
         LAYER, INMEM
     }
 
+    public enum CubingJobTypeEnum {
+        BUILD("BUILD"), MERGE("MERGE");
+
+        private final String name;
+
+        CubingJobTypeEnum(String name) {
+            this.name = name;
+        }
+
+        public String toString() {
+            return name;
+        }
+
+        public static CubingJobTypeEnum getByName(String name) {
+            if (Strings.isNullOrEmpty(name)) {
+                return null;
+            }
+            for (CubingJobTypeEnum jobTypeEnum : CubingJobTypeEnum.values()) {
+                if (jobTypeEnum.name.equals(name.toUpperCase())) {
+                    return jobTypeEnum;
+                }
+            }
+            return null;
+        }
+    }
+
+    //32MB per block created by the first step
+    public static final long MIN_SOURCE_SIZE = 33554432L;
+
     // KEYS of Output.extraInfo map, info passed across job steps
     public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
     public static final String SOURCE_SIZE_BYTES = "sourceSizeBytes";
@@ -68,13 +103,14 @@ public class CubingJob extends DefaultChainedExecutable {
     public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
     private static final String DEPLOY_ENV_NAME = "envName";
     private static final String PROJECT_INSTANCE_NAME = "projectName";
+    private static final String JOB_TYPE = "jobType";
 
     public static CubingJob createBuildJob(CubeSegment seg, String submitter, JobEngineConfig config) {
-        return initCubingJob(seg, "BUILD", submitter, config);
+        return initCubingJob(seg, CubingJobTypeEnum.BUILD.toString(), submitter, config);
     }
 
     public static CubingJob createMergeJob(CubeSegment seg, String submitter, JobEngineConfig config) {
-        return initCubingJob(seg, "MERGE", submitter, config);
+        return initCubingJob(seg, CubingJobTypeEnum.MERGE.toString(), submitter, config);
     }
 
     private static CubingJob initCubingJob(CubeSegment seg, String jobType, String submitter, JobEngineConfig config) {
@@ -99,6 +135,7 @@ public class CubingJob extends DefaultChainedExecutable {
         format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
         result.setDeployEnvName(kylinConfig.getDeployEnv());
         result.setProjectName(projList.get(0).getName());
+        result.setJobType(jobType);
         CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), result.getParams());
         CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
         result.setName(jobType + " CUBE - " + seg.getCubeInstance().getName() + " - " + seg.getName() + " - "
@@ -128,6 +165,14 @@ public class CubingJob extends DefaultChainedExecutable {
         return getParam(PROJECT_INSTANCE_NAME);
     }
 
+    public String getJobType() {
+        return getParam(JOB_TYPE);
+    }
+
+    void setJobType(String jobType) {
+        setParam(JOB_TYPE, jobType);
+    }
+
     @Override
     protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) {
         CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig())
@@ -197,6 +242,53 @@ public class CubingJob extends DefaultChainedExecutable {
         super.onExecuteFinished(result, executableContext);
     }
 
+    protected void onStatusChange(ExecutableContext context, ExecuteResult result, ExecutableState state) {
+        super.onStatusChange(context, result, state);
+
+        /**
+         * report job related metrics
+         */
+        if (state == ExecutableState.SUCCEED) {
+            JobRecordEventWrapper jobRecordEventWrapper = new JobRecordEventWrapper(
+                    new RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob()));
+            jobRecordEventWrapper.setWrapper(ProjectInstance.getNormalizedProjectName(getProjectName()),
+                    CubingExecutableUtil.getCubeName(getParams()), getId(), getJobType(),
+                    getAlgorithm() == null ? "NULL" : getAlgorithm().toString());
+            long tableSize = findSourceSizeBytes();
+            long buildDuration = getDuration();
+            long waitResourceTime = getMapReduceWaitTime();
+            jobRecordEventWrapper.setStats(tableSize, findCubeSizeBytes(), buildDuration, waitResourceTime,
+                    getPerBytesTimeCost(tableSize, buildDuration - waitResourceTime));
+            if (CubingJobTypeEnum.getByName(getJobType()) == CubingJobTypeEnum.BUILD) {
+                jobRecordEventWrapper.setStepStats(
+                        getTaskByName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS).getDuration(), //
+                        getTaskByName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY).getDuration(), //
+                        getTaskByName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE).getDuration(), //
+                        getTaskByName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE).getDuration());
+            }
+            MetricsManager.getInstance().update(jobRecordEventWrapper.getMetricsRecord());
+        } else if (state == ExecutableState.ERROR) {
+            ExceptionRecordEventWrapper exceptionRecordEventWrapper = new ExceptionRecordEventWrapper(
+                    new RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException()));
+            exceptionRecordEventWrapper.setWrapper(ProjectInstance.getNormalizedProjectName(getProjectName()),
+                    CubingExecutableUtil.getCubeName(getParams()), getId(), getJobType(),
+                    getAlgorithm() == null ? "NULL" : getAlgorithm().toString(),
+                    result.getThrowable() != null ? result.getThrowable().getClass() : Exception.class);
+            MetricsManager.getInstance().update(exceptionRecordEventWrapper.getMetricsRecord());
+        }
+
+    }
+
+    private double getPerBytesTimeCost(long size, long timeCost) {
+        if (size <= 0) {
+            return 0;
+        }
+        if (size < MIN_SOURCE_SIZE) {
+            size = MIN_SOURCE_SIZE;
+        }
+        return timeCost * 1.0 / size;
+    }
+
     /**
      * build fail because the metadata store has problem.
      * @param exception

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
index ce19500..ddbcc99 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Constructor;
 
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.engine.mr.exception.HadoopShellException;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
@@ -68,13 +69,14 @@ public class HadoopShellExecutable extends AbstractExecutable {
                 result = 2;
             }
             log.append("result code:").append(result);
-            return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()) : new ExecuteResult(ExecuteResult.State.FAILED, log.toString());
+            return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString())
+                    : new ExecuteResult(ExecuteResult.State.FAILED, new HadoopShellException(log.toString()));
         } catch (ReflectiveOperationException e) {
             logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         } catch (Exception e) {
             logger.error("error execute " + this.toString(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index af02364..27124b3 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -43,6 +43,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.exception.MapReduceException;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -138,7 +139,7 @@ public class MapReduceExecutable extends AbstractExecutable {
                     ex.printStackTrace(new PrintWriter(stringWriter));
                     log.append(stringWriter.toString()).append("\n");
                     log.append("result code:").append(2);
-                    return new ExecuteResult(ExecuteResult.State.ERROR, log.toString());
+                    return new ExecuteResult(ex, log.toString());
                 }
                 job = hadoopJob.getJob();
             }
@@ -167,7 +168,7 @@ public class MapReduceExecutable extends AbstractExecutable {
                     if (status == JobStepStatusEnum.FINISHED) {
                         return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
                     } else {
-                        return new ExecuteResult(ExecuteResult.State.FAILED, output.toString());
+                        return new ExecuteResult(ExecuteResult.State.FAILED, new MapReduceException(output.toString()));
                     }
                 }
                 Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000L);
@@ -190,10 +191,10 @@ public class MapReduceExecutable extends AbstractExecutable {
 
         } catch (ReflectiveOperationException e) {
             logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         } catch (Exception e) {
             logger.error("error execute " + this.toString(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/HadoopShellException.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/HadoopShellException.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/HadoopShellException.java
new file mode 100644
index 0000000..23d4a3a
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/HadoopShellException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.exception;
+
+/**
+ */
+public class HadoopShellException extends Exception {
+
+    public HadoopShellException() {
+    }
+
+    public HadoopShellException(String message) {
+        super(message);
+    }
+
+    public HadoopShellException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public HadoopShellException(Throwable cause) {
+        super(cause);
+    }
+
+    public HadoopShellException(String message, Throwable cause, boolean enableSuppression,
+            boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/MapReduceException.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/MapReduceException.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/MapReduceException.java
new file mode 100644
index 0000000..fc047fe
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/MapReduceException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.exception;
+
+/**
+ */
+public class MapReduceException extends Exception {
+
+    public MapReduceException() {
+    }
+
+    public MapReduceException(String message) {
+        super(message);
+    }
+
+    public MapReduceException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MapReduceException(Throwable cause) {
+        super(cause);
+    }
+
+    public MapReduceException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/SegmentNotFoundException.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/SegmentNotFoundException.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/SegmentNotFoundException.java
new file mode 100644
index 0000000..3e8b59e
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/SegmentNotFoundException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.exception;
+
+/**
+ */
+public class SegmentNotFoundException extends Exception {
+
+    public SegmentNotFoundException() {
+    }
+
+    public SegmentNotFoundException(String message) {
+        super(message);
+    }
+
+    public SegmentNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SegmentNotFoundException(Throwable cause) {
+        super(cause);
+    }
+
+    public SegmentNotFoundException(String message, Throwable cause, boolean enableSuppression,
+            boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index 58b2c02..87e95d8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -72,7 +72,7 @@ public class MergeDictionaryStep extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to merge dictionary or lookup snapshots", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 04d8231..ccebbb2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -136,7 +136,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to merge cuboid statistics", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index f15819f..f18de0e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -80,7 +80,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to save cuboid statistics", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index dd221f1..a53071a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -76,7 +76,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to update cube after build", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index add5c42..86e6080 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -26,6 +26,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.exception.SegmentNotFoundException;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
@@ -51,7 +52,8 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
 
         CubeSegment mergedSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
         if (mergedSegment == null) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()));
+            return new ExecuteResult(ExecuteResult.State.FAILED, new SegmentNotFoundException(
+                    "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams())));
         }
 
         CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
@@ -60,7 +62,8 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         // collect source statistics
         List<String> mergingSegmentIds = CubingExecutableUtil.getMergingSegmentIds(this.getParams());
         if (mergingSegmentIds.isEmpty()) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
+            return new ExecuteResult(ExecuteResult.State.FAILED,
+                    new SegmentNotFoundException("there are no merging segments"));
         }
         long sourceCount = 0L;
         long sourceSize = 0L;
@@ -82,7 +85,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED);
         } catch (IOException e) {
             logger.error("fail to update cube after merge", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 7f4b377..f63e999 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -154,7 +154,7 @@ public class SparkExecutable extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
         } catch (Exception e) {
             logger.error("error run spark job:", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index dc85c52..476068b 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -73,7 +73,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
 
         } catch (Exception e) {
             logger.error("job:" + getId() + " execute finished with exception", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+            return new ExecuteResult(e, stepLogger.getBufferedLog());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 12916ec..38b8794 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -350,7 +350,7 @@ public class HiveMRInput implements IMRInput {
 
             } catch (Exception e) {
                 logger.error("job:" + getId() + " execute finished with exception", e);
-                return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+                return new ExecuteResult(e, stepLogger.getBufferedLog());
             }
         }
 
@@ -392,7 +392,7 @@ public class HiveMRInput implements IMRInput {
                 //output.append(cleanUpHiveViewIntermediateTable(config));
             } catch (IOException e) {
                 logger.error("job:" + getId() + " execute finished with exception", e);
-                return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+                return new ExecuteResult(e, e.getMessage());
             }
 
             return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-hive/src/main/java/org/apache/kylin/source/hive/exception/SegmentEmptyException.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/exception/SegmentEmptyException.java b/source-hive/src/main/java/org/apache/kylin/source/hive/exception/SegmentEmptyException.java
new file mode 100644
index 0000000..10a11c0
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/exception/SegmentEmptyException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive.exception;
+
+/**
+ */
+public class SegmentEmptyException extends Exception {
+
+    public SegmentEmptyException() {
+    }
+
+    public SegmentEmptyException(String message) {
+        super(message);
+    }
+
+    public SegmentEmptyException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SegmentEmptyException(Throwable cause) {
+        super(cause);
+    }
+
+    public SegmentEmptyException(String message, Throwable cause, boolean enableSuppression,
+            boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
index 63593c0..4025457 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
@@ -62,7 +62,7 @@ public class CmdStep extends AbstractExecutable {
 
         } catch (Exception e) {
             logger.error("job:" + getId() + " execute finished with exception", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+            return new ExecuteResult(e, stepLogger.getBufferedLog());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
index 8a6c90f..a67dc37 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
@@ -62,7 +62,7 @@ public class HiveCmdStep extends AbstractExecutable {
 
         } catch (Exception e) {
             logger.error("job:" + getId() + " execute finished with exception", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+            return new ExecuteResult(e, stepLogger.getBufferedLog());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 5bce4e7..c6914c3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -222,7 +222,7 @@ public class KafkaMRInput implements IMRInput {
                 rmdirOnHDFS(getDataPath());
             } catch (IOException e) {
                 logger.error("job:" + getId() + " execute finished with exception", e);
-                return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+                return new ExecuteResult(e, e.getMessage());
             }
 
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "HDFS path " + getDataPath() + " is dropped.\n");

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index e3a7586..406d92e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -76,7 +76,7 @@ public class MergeOffsetStep extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to update cube segment offset", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
index df3cf08..c454e60 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
@@ -78,7 +78,7 @@ public class DeprecatedGCStep extends AbstractExecutable {
         } catch (IOException e) {
             logger.error("job:" + getId() + " execute finished with exception", e);
             output.append("\n").append(e.getLocalizedMessage());
-            return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+            return new ExecuteResult(e, output.toString());
         }
 
         return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
index 86e8e6b..b9b5fe6 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -67,7 +67,7 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
         } catch (IOException e) {
             logger.error("job:" + getId() + " execute finished with exception", e);
             output.append("\n").append(e.getLocalizedMessage());
-            return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+            return new ExecuteResult(e, output.toString());
         }
 
         return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());

http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
index 2f7e164..7371e08 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
@@ -95,7 +95,7 @@ public class MergeGCStep extends AbstractExecutable {
             } catch (IOException e) {
                 output.append("Got error when drop HBase table, exiting... \n");
                 // This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
-                return new ExecuteResult(ExecuteResult.State.ERROR, output.append(e.getLocalizedMessage()).toString());
+                return new ExecuteResult(e, output.append(e.getLocalizedMessage()).toString());
             } finally {
                 if (admin != null)
                     try {