You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/27 05:52:39 UTC
[09/14] kylin git commit: Revert "Revert "KYLIN-1762 discard job when
no stream message""
Revert "Revert "KYLIN-1762 discard job when no stream message""
This reverts commit da5ba276b972f8b3c0d220252e74ac2ff73298fc.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/25f8ffc0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/25f8ffc0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/25f8ffc0
Branch: refs/heads/master
Commit: 25f8ffc0ee16b1702723d5530b22084a608fed9a
Parents: ae3d7e4
Author: shaofengshi <sh...@apache.org>
Authored: Sat Sep 24 14:57:01 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 27 10:17:40 2016 +0800
----------------------------------------------------------------------
.../job/execution/DefaultChainedExecutable.java | 6 +++
.../kylin/source/kafka/SeekOffsetStep.java | 45 +++++++++++++++-----
2 files changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/25f8ffc0/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 753b389..39a5f4f 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -88,6 +88,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
boolean allSucceed = true;
boolean hasError = false;
boolean hasRunning = false;
+ boolean hasDiscarded = false;
for (Executable task : jobs) {
final ExecutableState status = task.getStatus();
if (status == ExecutableState.ERROR) {
@@ -99,6 +100,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
if (status == ExecutableState.RUNNING) {
hasRunning = true;
}
+ if (status == ExecutableState.DISCARDED) {
+ hasDiscarded = true;
+ }
}
if (allSucceed) {
setEndTime(System.currentTimeMillis());
@@ -110,6 +114,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
notifyUserStatusChange(executableContext, ExecutableState.ERROR);
} else if (hasRunning) {
jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ } else if (hasDiscarded) {
+ jobService.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null);
} else {
jobService.updateJobOutput(getId(), ExecutableState.READY, null, null);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/25f8ffc0/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
index 5dca93f..479f1b8 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
@@ -17,6 +17,10 @@
*/
package org.apache.kylin.source.kafka;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
+import org.apache.commons.math3.util.MathUtils;
import org.apache.kylin.source.kafka.util.KafkaClient;
import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -34,6 +38,7 @@ import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -101,19 +106,39 @@ public class SeekOffsetStep extends AbstractExecutable {
}
}
- KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
- KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
+ long totalStartOffset = 0, totalEndOffset = 0;
+ for (Long v : startOffsets.values()) {
+ totalStartOffset += v;
+ }
+ for (Long v : endOffsets.values()) {
+ totalEndOffset += v;
+ }
- segment.setName(CubeSegment.makeSegmentName(0, 0, segment.getSourceOffsetStart(), segment.getSourceOffsetEnd()));
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
+ if (totalEndOffset > totalStartOffset) {
+ KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
+ KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
+ segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset));
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToUpdateSegs(segment);
+ try {
+ cubeManager.updateCube(cubeBuilder);
+ } catch (IOException e) {
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (IOException e) {
- logger.error("fail to update cube segment offset", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ } else {
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToRemoveSegs(segment);
+ try {
+ cubeManager.updateCube(cubeBuilder);
+ } catch (IOException e) {
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes");
}
+
+
}
}