You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/27 00:25:57 UTC
[04/50] [abbrv] kylin git commit: Revert "KYLIN-1762 discard job when
no stream message"
Revert "KYLIN-1762 discard job when no stream message"
This reverts commit 1108d9eeccecbccffea0b3f9049151672196c91a.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/da5ba276
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/da5ba276
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/da5ba276
Branch: refs/heads/1.5.x-HBase1.x
Commit: da5ba276b972f8b3c0d220252e74ac2ff73298fc
Parents: bec25b4
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Sep 19 23:50:20 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Tue Sep 20 11:43:08 2016 +0800
----------------------------------------------------------------------
.../job/execution/DefaultChainedExecutable.java | 6 ---
.../kylin/source/kafka/SeekOffsetStep.java | 45 +++++---------------
2 files changed, 10 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/da5ba276/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 39a5f4f..753b389 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -88,7 +88,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
boolean allSucceed = true;
boolean hasError = false;
boolean hasRunning = false;
- boolean hasDiscarded = false;
for (Executable task : jobs) {
final ExecutableState status = task.getStatus();
if (status == ExecutableState.ERROR) {
@@ -100,9 +99,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
if (status == ExecutableState.RUNNING) {
hasRunning = true;
}
- if (status == ExecutableState.DISCARDED) {
- hasDiscarded = true;
- }
}
if (allSucceed) {
setEndTime(System.currentTimeMillis());
@@ -114,8 +110,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
notifyUserStatusChange(executableContext, ExecutableState.ERROR);
} else if (hasRunning) {
jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
- } else if (hasDiscarded) {
- jobService.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null);
} else {
jobService.updateJobOutput(getId(), ExecutableState.READY, null, null);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/da5ba276/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
index 479f1b8..5dca93f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
@@ -17,10 +17,6 @@
*/
package org.apache.kylin.source.kafka;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Maps;
-import org.apache.commons.math3.util.MathUtils;
import org.apache.kylin.source.kafka.util.KafkaClient;
import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -38,7 +34,6 @@ import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -106,39 +101,19 @@ public class SeekOffsetStep extends AbstractExecutable {
}
}
- long totalStartOffset = 0, totalEndOffset = 0;
- for (Long v : startOffsets.values()) {
- totalStartOffset += v;
- }
- for (Long v : endOffsets.values()) {
- totalEndOffset += v;
- }
+ KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
+ KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
- if (totalEndOffset > totalStartOffset) {
- KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
- KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
- segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset));
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
- } catch (IOException e) {
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
+ segment.setName(CubeSegment.makeSegmentName(0, 0, segment.getSourceOffsetStart(), segment.getSourceOffsetEnd()));
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToUpdateSegs(segment);
+ try {
+ cubeManager.updateCube(cubeBuilder);
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } else {
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToRemoveSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
- } catch (IOException e) {
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
-
- return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes");
+ } catch (IOException e) {
+ logger.error("fail to update cube segment offset", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
}
-
-
}
}