You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/10/27 14:05:53 UTC
[3/6] kylin git commit: APACHE-KYLIN-2723: collect job related metrics
APACHE-KYLIN-2723: collect job related metrics
Signed-off-by: lidongsjtu <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ff0d0edc
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ff0d0edc
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ff0d0edc
Branch: refs/heads/master
Commit: ff0d0edc4f71a112dac347656a6c5bdc786975a9
Parents: 4e3b515
Author: Zhong <nj...@apache.org>
Authored: Thu Aug 10 17:47:32 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Fri Oct 27 21:58:08 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 9 ++
.../kylin/job/common/ShellExecutable.java | 7 +-
.../kylin/job/exception/ShellException.java | 43 +++++++++
.../job/execution/DefaultChainedExecutable.java | 18 ++--
.../kylin/job/execution/ExecuteResult.java | 30 ++++++
.../kylin/metadata/project/ProjectInstance.java | 7 ++
.../metrics/job/JobRecordEventWrapper.java | 23 +----
engine-mr/pom.xml | 4 +
.../org/apache/kylin/engine/mr/CubingJob.java | 96 +++++++++++++++++++-
.../engine/mr/common/HadoopShellExecutable.java | 8 +-
.../engine/mr/common/MapReduceExecutable.java | 9 +-
.../mr/exception/HadoopShellException.java | 44 +++++++++
.../engine/mr/exception/MapReduceException.java | 43 +++++++++
.../mr/exception/SegmentNotFoundException.java | 44 +++++++++
.../engine/mr/steps/MergeDictionaryStep.java | 2 +-
.../engine/mr/steps/MergeStatisticsStep.java | 2 +-
.../engine/mr/steps/SaveStatisticsStep.java | 2 +-
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 2 +-
.../mr/steps/UpdateCubeInfoAfterMergeStep.java | 9 +-
.../kylin/engine/spark/SparkExecutable.java | 2 +-
.../source/hive/CreateFlatHiveTableStep.java | 2 +-
.../apache/kylin/source/hive/HiveMRInput.java | 4 +-
.../hive/exception/SegmentEmptyException.java | 44 +++++++++
.../org/apache/kylin/source/jdbc/CmdStep.java | 2 +-
.../apache/kylin/source/jdbc/HiveCmdStep.java | 2 +-
.../apache/kylin/source/kafka/KafkaMRInput.java | 2 +-
.../kylin/source/kafka/job/MergeOffsetStep.java | 2 +-
.../storage/hbase/steps/DeprecatedGCStep.java | 2 +-
.../steps/HDFSPathGarbageCollectionStep.java | 2 +-
.../kylin/storage/hbase/steps/MergeGCStep.java | 2 +-
30 files changed, 413 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index c988b9f..3a9a05e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1350,6 +1350,15 @@ abstract public class KylinConfigBase implements Serializable {
return getDeployEnv();
}
+ public String getKylinMetricsSubjectJob() {
+ return getOptional("kylin.core.metrics.subject-job", "METRICS_JOB") + "_" + getKylinMetricsSubjectSuffix();
+ }
+
+ public String getKylinMetricsSubjectJobException() {
+ return getOptional("kylin.core.metrics.subject-job-exception", "METRICS_JOB_EXCEPTION") + "_"
+ + getKylinMetricsSubjectSuffix();
+ }
+
public String getKylinMetricsSubjectQuery() {
return getOptional("kylin.core.metrics.subject-query", "METRICS_QUERY") + "_" + getKylinMetricsSubjectSuffix();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
index 9f431b0..070ac23 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
@@ -19,8 +19,10 @@
package org.apache.kylin.job.common;
import java.io.IOException;
+
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.ShellException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
@@ -45,10 +47,11 @@ public class ShellExecutable extends AbstractExecutable {
final PatternedLogger patternedLogger = new PatternedLogger(logger);
final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), patternedLogger);
getManager().addJobInfo(getId(), patternedLogger.getInfo());
- return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond());
+ return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED,
+ new ShellException(result.getSecond()));
} catch (IOException e) {
logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return new ExecuteResult(e, e.getLocalizedMessage());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-job/src/main/java/org/apache/kylin/job/exception/ShellException.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/exception/ShellException.java b/core-job/src/main/java/org/apache/kylin/job/exception/ShellException.java
new file mode 100644
index 0000000..443bd9b
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/ShellException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class ShellException extends Exception {
+
+ public ShellException() {
+ }
+
+ public ShellException(String message) {
+ super(message);
+ }
+
+ public ShellException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ShellException(Throwable cause) {
+ super(cause);
+ }
+
+ public ShellException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index cbd49ae..77d5efa 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -64,7 +64,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
return subTask.execute(context);
}
}
- return new ExecuteResult(ExecuteResult.State.SUCCEED, null);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED);
}
@Override
@@ -82,7 +82,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
@Override
protected void onExecuteError(Throwable exception, ExecutableContext executableContext) {
super.onExecuteError(exception, executableContext);
- notifyUserStatusChange(executableContext, ExecutableState.ERROR);
+ onStatusChange(executableContext, new ExecuteResult(exception), ExecutableState.ERROR);
}
@Override
@@ -91,10 +91,10 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
if (isDiscarded()) {
setEndTime(System.currentTimeMillis());
- notifyUserStatusChange(executableContext, ExecutableState.DISCARDED);
+ onStatusChange(executableContext, result, ExecutableState.DISCARDED);
} else if (isPaused()) {
setEndTime(System.currentTimeMillis());
- notifyUserStatusChange(executableContext, ExecutableState.STOPPED);
+ onStatusChange(executableContext, result, ExecutableState.STOPPED);
} else if (result.succeed()) {
List<? extends Executable> jobs = getTasks();
boolean allSucceed = true;
@@ -119,11 +119,11 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
if (allSucceed) {
setEndTime(System.currentTimeMillis());
mgr.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null);
- notifyUserStatusChange(executableContext, ExecutableState.SUCCEED);
+ onStatusChange(executableContext, result, ExecutableState.SUCCEED);
} else if (hasError) {
setEndTime(System.currentTimeMillis());
mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
- notifyUserStatusChange(executableContext, ExecutableState.ERROR);
+ onStatusChange(executableContext, result, ExecutableState.ERROR);
} else if (hasRunning) {
mgr.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
} else if (hasDiscarded) {
@@ -135,10 +135,14 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
} else {
setEndTime(System.currentTimeMillis());
mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
- notifyUserStatusChange(executableContext, ExecutableState.ERROR);
+ onStatusChange(executableContext, result, ExecutableState.ERROR);
}
}
+ protected void onStatusChange(ExecutableContext context, ExecuteResult result, ExecutableState state) {
+ notifyUserStatusChange(context, state);
+ }
+
@Override
public List<AbstractExecutable> getTasks() {
return subTasks;
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
index 760a574..139c04b 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
@@ -30,6 +30,14 @@ public final class ExecuteResult {
private final State state;
private final String output;
+ private final Throwable throwable;
+
+ /**
+ * Default constructor to indicate a success ExecuteResult.
+ */
+ public ExecuteResult() {
+ this(State.SUCCEED, "succeed");
+ }
public ExecuteResult(State state) {
this(state, "");
@@ -39,6 +47,24 @@ public final class ExecuteResult {
Preconditions.checkArgument(state != null, "state cannot be null");
this.state = state;
this.output = output;
+ this.throwable = null;
+ }
+
+ public ExecuteResult(State state, Throwable throwable) {
+ Preconditions.checkArgument(state != null, "state cannot be null");
+ this.state = state;
+ this.throwable = throwable;
+ this.output = throwable.getMessage();
+ }
+
+ public ExecuteResult(Throwable throwable) {
+ this(throwable, throwable.getMessage());
+ }
+
+ public ExecuteResult(Throwable throwable, String output) {
+ this.state = State.ERROR;
+ this.throwable = throwable;
+ this.output = output;
}
public State state() {
@@ -52,4 +78,8 @@ public final class ExecuteResult {
public String output() {
return output;
}
+
+ public Throwable getThrowable() {
+ return throwable;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 5f0e6a3..1f54416 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -92,6 +92,13 @@ public class ProjectInstance extends RootPersistentEntity {
return ResourceStore.PROJECT_RESOURCE_ROOT + "/" + projectName + ".json";
}
+ public static String getNormalizedProjectName(String project) {
+ if (project == null)
+ throw new IllegalStateException("Trying to normalized a project name which is null");
+
+ return project.toUpperCase();
+ }
+
public static ProjectInstance create(String name, String owner, String description, LinkedHashMap<String, String> overrideProps, List<RealizationEntry> realizationEntries, List<String> models) {
ProjectInstance projectInstance = new ProjectInstance();
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
index 537388c..6cd197e 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
@@ -39,7 +39,7 @@ public class JobRecordEventWrapper extends RecordEventWrapper {
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), 0L);
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), 0L);
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), 0L);
- setDependentStats();
+ this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 0L);
}
public void setWrapper(String projectName, String cubeName, String jobId, String jobType, String cubingType) {
@@ -50,12 +50,13 @@ public class JobRecordEventWrapper extends RecordEventWrapper {
this.metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), cubingType);
}
- public void setStats(long tableSize, long cubeSize, long buildDuration, long waitResourceTime) {
+ public void setStats(long tableSize, long cubeSize, long buildDuration, long waitResourceTime,
+ double perBytesTimeCost) {
this.metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), tableSize);
this.metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), cubeSize);
this.metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), buildDuration);
this.metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), waitResourceTime);
- setDependentStats();
+ this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), perBytesTimeCost);
}
public void setStepStats(long dColumnDistinct, long dDictBuilding, long dCubingInmem, long dHfileConvert) {
@@ -64,20 +65,4 @@ public class JobRecordEventWrapper extends RecordEventWrapper {
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), dCubingInmem);
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), dHfileConvert);
}
-
- private void setDependentStats() {
- Long sourceSize = (Long) this.metricsEvent.get(JobPropertyEnum.SOURCE_SIZE.toString());
- if (sourceSize != null && sourceSize != 0) {
- if (sourceSize < MIN_SOURCE_SIZE) {
- sourceSize = MIN_SOURCE_SIZE;
- }
- this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(),
- ((Long) this.metricsEvent.get(JobPropertyEnum.BUILD_DURATION.toString())
- - (Long) this.metricsEvent.get(JobPropertyEnum.WAIT_RESOURCE_TIME.toString())) * 1.0
- / sourceSize);
- } else {
- this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 0.0);
- }
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/pom.xml
----------------------------------------------------------------------
diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index 1ddc2bd..52d3d86 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -51,6 +51,10 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-storage</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-metrics</artifactId>
+ </dependency>
<!-- Env & Test -->
<dependency>
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 466f706..c4b6e12 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -48,9 +48,15 @@ import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metrics.MetricsManager;
+import org.apache.kylin.metrics.job.ExceptionRecordEventWrapper;
+import org.apache.kylin.metrics.job.JobRecordEventWrapper;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Strings;
+
/**
*/
public class CubingJob extends DefaultChainedExecutable {
@@ -61,6 +67,35 @@ public class CubingJob extends DefaultChainedExecutable {
LAYER, INMEM
}
+ public enum CubingJobTypeEnum {
+ BUILD("BUILD"), MERGE("MERGE");
+
+ private final String name;
+
+ CubingJobTypeEnum(String name) {
+ this.name = name;
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ public static CubingJobTypeEnum getByName(String name) {
+ if (Strings.isNullOrEmpty(name)) {
+ return null;
+ }
+ for (CubingJobTypeEnum jobTypeEnum : CubingJobTypeEnum.values()) {
+ if (jobTypeEnum.name.equals(name.toUpperCase())) {
+ return jobTypeEnum;
+ }
+ }
+ return null;
+ }
+ }
+
+ //32MB per block created by the first step
+ public static final long MIN_SOURCE_SIZE = 33554432L;
+
// KEYS of Output.extraInfo map, info passed across job steps
public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
public static final String SOURCE_SIZE_BYTES = "sourceSizeBytes";
@@ -68,13 +103,14 @@ public class CubingJob extends DefaultChainedExecutable {
public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
private static final String DEPLOY_ENV_NAME = "envName";
private static final String PROJECT_INSTANCE_NAME = "projectName";
+ private static final String JOB_TYPE = "jobType";
public static CubingJob createBuildJob(CubeSegment seg, String submitter, JobEngineConfig config) {
- return initCubingJob(seg, "BUILD", submitter, config);
+ return initCubingJob(seg, CubingJobTypeEnum.BUILD.toString(), submitter, config);
}
public static CubingJob createMergeJob(CubeSegment seg, String submitter, JobEngineConfig config) {
- return initCubingJob(seg, "MERGE", submitter, config);
+ return initCubingJob(seg, CubingJobTypeEnum.MERGE.toString(), submitter, config);
}
private static CubingJob initCubingJob(CubeSegment seg, String jobType, String submitter, JobEngineConfig config) {
@@ -99,6 +135,7 @@ public class CubingJob extends DefaultChainedExecutable {
format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
result.setDeployEnvName(kylinConfig.getDeployEnv());
result.setProjectName(projList.get(0).getName());
+ result.setJobType(jobType);
CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), result.getParams());
CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
result.setName(jobType + " CUBE - " + seg.getCubeInstance().getName() + " - " + seg.getName() + " - "
@@ -128,6 +165,14 @@ public class CubingJob extends DefaultChainedExecutable {
return getParam(PROJECT_INSTANCE_NAME);
}
+ public String getJobType() {
+ return getParam(JOB_TYPE);
+ }
+
+ void setJobType(String jobType) {
+ setParam(JOB_TYPE, jobType);
+ }
+
@Override
protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) {
CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig())
@@ -197,6 +242,53 @@ public class CubingJob extends DefaultChainedExecutable {
super.onExecuteFinished(result, executableContext);
}
+ protected void onStatusChange(ExecutableContext context, ExecuteResult result, ExecutableState state) {
+ super.onStatusChange(context, result, state);
+
+ /**
+ * report job related metrics
+ */
+ if (state == ExecutableState.SUCCEED) {
+ JobRecordEventWrapper jobRecordEventWrapper = new JobRecordEventWrapper(
+ new RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob()));
+ jobRecordEventWrapper.setWrapper(ProjectInstance.getNormalizedProjectName(getProjectName()),
+ CubingExecutableUtil.getCubeName(getParams()), getId(), getJobType(),
+ getAlgorithm() == null ? "NULL" : getAlgorithm().toString());
+ long tableSize = findSourceSizeBytes();
+ long buildDuration = getDuration();
+ long waitResourceTime = getMapReduceWaitTime();
+ jobRecordEventWrapper.setStats(tableSize, findCubeSizeBytes(), buildDuration, waitResourceTime,
+ getPerBytesTimeCost(tableSize, buildDuration - waitResourceTime));
+ if (CubingJobTypeEnum.getByName(getJobType()) == CubingJobTypeEnum.BUILD) {
+ jobRecordEventWrapper.setStepStats(
+ getTaskByName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS).getDuration(), //
+ getTaskByName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY).getDuration(), //
+ getTaskByName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE).getDuration(), //
+ getTaskByName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE).getDuration());
+ }
+ MetricsManager.getInstance().update(jobRecordEventWrapper.getMetricsRecord());
+ } else if (state == ExecutableState.ERROR) {
+ ExceptionRecordEventWrapper exceptionRecordEventWrapper = new ExceptionRecordEventWrapper(
+ new RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException()));
+ exceptionRecordEventWrapper.setWrapper(ProjectInstance.getNormalizedProjectName(getProjectName()),
+ CubingExecutableUtil.getCubeName(getParams()), getId(), getJobType(),
+ getAlgorithm() == null ? "NULL" : getAlgorithm().toString(),
+ result.getThrowable() != null ? result.getThrowable().getClass() : Exception.class);
+ MetricsManager.getInstance().update(exceptionRecordEventWrapper.getMetricsRecord());
+ }
+
+ }
+
+ private double getPerBytesTimeCost(long size, long timeCost) {
+ if (size <= 0) {
+ return 0;
+ }
+ if (size < MIN_SOURCE_SIZE) {
+ size = MIN_SOURCE_SIZE;
+ }
+ return timeCost * 1.0 / size;
+ }
+
/**
* build fail because the metadata store has problem.
* @param exception
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
index ce19500..ddbcc99 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Constructor;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.engine.mr.exception.HadoopShellException;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
@@ -68,13 +69,14 @@ public class HadoopShellExecutable extends AbstractExecutable {
result = 2;
}
log.append("result code:").append(result);
- return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()) : new ExecuteResult(ExecuteResult.State.FAILED, log.toString());
+ return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString())
+ : new ExecuteResult(ExecuteResult.State.FAILED, new HadoopShellException(log.toString()));
} catch (ReflectiveOperationException e) {
logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return new ExecuteResult(e, e.getLocalizedMessage());
} catch (Exception e) {
logger.error("error execute " + this.toString(), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return new ExecuteResult(e, e.getLocalizedMessage());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index af02364..27124b3 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -43,6 +43,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.exception.MapReduceException;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.constant.JobStepStatusEnum;
import org.apache.kylin.job.exception.ExecuteException;
@@ -138,7 +139,7 @@ public class MapReduceExecutable extends AbstractExecutable {
ex.printStackTrace(new PrintWriter(stringWriter));
log.append(stringWriter.toString()).append("\n");
log.append("result code:").append(2);
- return new ExecuteResult(ExecuteResult.State.ERROR, log.toString());
+ return new ExecuteResult(ex, log.toString());
}
job = hadoopJob.getJob();
}
@@ -167,7 +168,7 @@ public class MapReduceExecutable extends AbstractExecutable {
if (status == JobStepStatusEnum.FINISHED) {
return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
} else {
- return new ExecuteResult(ExecuteResult.State.FAILED, output.toString());
+ return new ExecuteResult(ExecuteResult.State.FAILED, new MapReduceException(output.toString()));
}
}
Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000L);
@@ -190,10 +191,10 @@ public class MapReduceExecutable extends AbstractExecutable {
} catch (ReflectiveOperationException e) {
logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return new ExecuteResult(e, e.getLocalizedMessage());
} catch (Exception e) {
logger.error("error execute " + this.toString(), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return new ExecuteResult(e, e.getLocalizedMessage());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/HadoopShellException.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/HadoopShellException.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/HadoopShellException.java
new file mode 100644
index 0000000..23d4a3a
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/HadoopShellException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.exception;
+
+/**
+ */
+public class HadoopShellException extends Exception {
+
+ public HadoopShellException() {
+ }
+
+ public HadoopShellException(String message) {
+ super(message);
+ }
+
+ public HadoopShellException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public HadoopShellException(Throwable cause) {
+ super(cause);
+ }
+
+ public HadoopShellException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/MapReduceException.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/MapReduceException.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/MapReduceException.java
new file mode 100644
index 0000000..fc047fe
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/MapReduceException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.exception;
+
+/**
+ */
+public class MapReduceException extends Exception {
+
+ public MapReduceException() {
+ }
+
+ public MapReduceException(String message) {
+ super(message);
+ }
+
+ public MapReduceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MapReduceException(Throwable cause) {
+ super(cause);
+ }
+
+ public MapReduceException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/SegmentNotFoundException.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/SegmentNotFoundException.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/SegmentNotFoundException.java
new file mode 100644
index 0000000..3e8b59e
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/SegmentNotFoundException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.exception;
+
+/**
+ */
+public class SegmentNotFoundException extends Exception {
+
+ public SegmentNotFoundException() {
+ }
+
+ public SegmentNotFoundException(String message) {
+ super(message);
+ }
+
+ public SegmentNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SegmentNotFoundException(Throwable cause) {
+ super(cause);
+ }
+
+ public SegmentNotFoundException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index 58b2c02..87e95d8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -72,7 +72,7 @@ public class MergeDictionaryStep extends AbstractExecutable {
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
} catch (IOException e) {
logger.error("fail to merge dictionary or lookup snapshots", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return new ExecuteResult(e, e.getLocalizedMessage());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 04d8231..ccebbb2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -136,7 +136,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
} catch (IOException e) {
logger.error("fail to merge cuboid statistics", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return new ExecuteResult(e, e.getLocalizedMessage());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index f15819f..f18de0e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -80,7 +80,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
} catch (IOException e) {
logger.error("fail to save cuboid statistics", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return new ExecuteResult(e, e.getLocalizedMessage());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index dd221f1..a53071a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -76,7 +76,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
} catch (IOException e) {
logger.error("fail to update cube after build", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return new ExecuteResult(e, e.getLocalizedMessage());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index add5c42..86e6080 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -26,6 +26,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.exception.SegmentNotFoundException;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
@@ -51,7 +52,8 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
CubeSegment mergedSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
if (mergedSegment == null) {
- return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()));
+ return new ExecuteResult(ExecuteResult.State.FAILED, new SegmentNotFoundException(
+ "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams())));
}
CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
@@ -60,7 +62,8 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
// collect source statistics
List<String> mergingSegmentIds = CubingExecutableUtil.getMergingSegmentIds(this.getParams());
if (mergingSegmentIds.isEmpty()) {
- return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
+ return new ExecuteResult(ExecuteResult.State.FAILED,
+ new SegmentNotFoundException("there are no merging segments"));
}
long sourceCount = 0L;
long sourceSize = 0L;
@@ -82,7 +85,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
return new ExecuteResult(ExecuteResult.State.SUCCEED);
} catch (IOException e) {
logger.error("fail to update cube after merge", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return new ExecuteResult(e, e.getLocalizedMessage());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 7f4b377..f63e999 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -154,7 +154,7 @@ public class SparkExecutable extends AbstractExecutable {
return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
} catch (Exception e) {
logger.error("error run spark job:", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return new ExecuteResult(e, e.getLocalizedMessage());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index dc85c52..476068b 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -73,7 +73,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
} catch (Exception e) {
logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+ return new ExecuteResult(e, stepLogger.getBufferedLog());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 12916ec..38b8794 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -350,7 +350,7 @@ public class HiveMRInput implements IMRInput {
} catch (Exception e) {
logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+ return new ExecuteResult(e, stepLogger.getBufferedLog());
}
}
@@ -392,7 +392,7 @@ public class HiveMRInput implements IMRInput {
//output.append(cleanUpHiveViewIntermediateTable(config));
} catch (IOException e) {
logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+ return new ExecuteResult(e, e.getMessage());
}
return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-hive/src/main/java/org/apache/kylin/source/hive/exception/SegmentEmptyException.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/exception/SegmentEmptyException.java b/source-hive/src/main/java/org/apache/kylin/source/hive/exception/SegmentEmptyException.java
new file mode 100644
index 0000000..10a11c0
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/exception/SegmentEmptyException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive.exception;
+
+/**
+ */
+public class SegmentEmptyException extends Exception {
+
+ public SegmentEmptyException() {
+ }
+
+ public SegmentEmptyException(String message) {
+ super(message);
+ }
+
+ public SegmentEmptyException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SegmentEmptyException(Throwable cause) {
+ super(cause);
+ }
+
+ public SegmentEmptyException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
index 63593c0..4025457 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
@@ -62,7 +62,7 @@ public class CmdStep extends AbstractExecutable {
} catch (Exception e) {
logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+ return new ExecuteResult(e, stepLogger.getBufferedLog());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
index 8a6c90f..a67dc37 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
@@ -62,7 +62,7 @@ public class HiveCmdStep extends AbstractExecutable {
} catch (Exception e) {
logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+ return new ExecuteResult(e, stepLogger.getBufferedLog());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 5bce4e7..c6914c3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -222,7 +222,7 @@ public class KafkaMRInput implements IMRInput {
rmdirOnHDFS(getDataPath());
} catch (IOException e) {
logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+ return new ExecuteResult(e, e.getMessage());
}
return new ExecuteResult(ExecuteResult.State.SUCCEED, "HDFS path " + getDataPath() + " is dropped.\n");
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index e3a7586..406d92e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -76,7 +76,7 @@ public class MergeOffsetStep extends AbstractExecutable {
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
} catch (IOException e) {
logger.error("fail to update cube segment offset", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return new ExecuteResult(e, e.getLocalizedMessage());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
index df3cf08..c454e60 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
@@ -78,7 +78,7 @@ public class DeprecatedGCStep extends AbstractExecutable {
} catch (IOException e) {
logger.error("job:" + getId() + " execute finished with exception", e);
output.append("\n").append(e.getLocalizedMessage());
- return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+ return new ExecuteResult(e, output.toString());
}
return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
index 86e8e6b..b9b5fe6 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -67,7 +67,7 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
} catch (IOException e) {
logger.error("job:" + getId() + " execute finished with exception", e);
output.append("\n").append(e.getLocalizedMessage());
- return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+ return new ExecuteResult(e, output.toString());
}
return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
http://git-wip-us.apache.org/repos/asf/kylin/blob/ff0d0edc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
index 2f7e164..7371e08 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
@@ -95,7 +95,7 @@ public class MergeGCStep extends AbstractExecutable {
} catch (IOException e) {
output.append("Got error when drop HBase table, exiting... \n");
// This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
- return new ExecuteResult(ExecuteResult.State.ERROR, output.append(e.getLocalizedMessage()).toString());
+ return new ExecuteResult(e, output.append(e.getLocalizedMessage()).toString());
} finally {
if (admin != null)
try {