You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/04 08:54:06 UTC

[12/31] kylin git commit: Revert "Revert "KYLIN-1762 discard job when no stream message""

Revert "Revert "KYLIN-1762 discard job when no stream message""

This reverts commit da5ba276b972f8b3c0d220252e74ac2ff73298fc.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/25f8ffc0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/25f8ffc0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/25f8ffc0

Branch: refs/heads/master-cdh5.7
Commit: 25f8ffc0ee16b1702723d5530b22084a608fed9a
Parents: ae3d7e4
Author: shaofengshi <sh...@apache.org>
Authored: Sat Sep 24 14:57:01 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 27 10:17:40 2016 +0800

----------------------------------------------------------------------
 .../job/execution/DefaultChainedExecutable.java |  6 +++
 .../kylin/source/kafka/SeekOffsetStep.java      | 45 +++++++++++++++-----
 2 files changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/25f8ffc0/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 753b389..39a5f4f 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -88,6 +88,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
             boolean allSucceed = true;
             boolean hasError = false;
             boolean hasRunning = false;
+            boolean hasDiscarded = false;
             for (Executable task : jobs) {
                 final ExecutableState status = task.getStatus();
                 if (status == ExecutableState.ERROR) {
@@ -99,6 +100,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
                 if (status == ExecutableState.RUNNING) {
                     hasRunning = true;
                 }
+                if (status == ExecutableState.DISCARDED) {
+                    hasDiscarded = true;
+                }
             }
             if (allSucceed) {
                 setEndTime(System.currentTimeMillis());
@@ -110,6 +114,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
                 notifyUserStatusChange(executableContext, ExecutableState.ERROR);
             } else if (hasRunning) {
                 jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+            } else if (hasDiscarded) {
+                jobService.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null);
             } else {
                 jobService.updateJobOutput(getId(), ExecutableState.READY, null, null);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/25f8ffc0/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
index 5dca93f..479f1b8 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
@@ -17,6 +17,10 @@
 */
 package org.apache.kylin.source.kafka;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
+import org.apache.commons.math3.util.MathUtils;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -34,6 +38,7 @@ import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -101,19 +106,39 @@ public class SeekOffsetStep extends AbstractExecutable {
             }
         }
 
-        KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
-        KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
+        long totalStartOffset = 0, totalEndOffset = 0;
+        for (Long v : startOffsets.values()) {
+            totalStartOffset += v;
+        }
+        for (Long v : endOffsets.values()) {
+            totalEndOffset += v;
+        }
 
-        segment.setName(CubeSegment.makeSegmentName(0, 0, segment.getSourceOffsetStart(), segment.getSourceOffsetEnd()));
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToUpdateSegs(segment);
-        try {
-            cubeManager.updateCube(cubeBuilder);
+        if (totalEndOffset > totalStartOffset) {
+            KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
+            KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
+            segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset));
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+            cubeBuilder.setToUpdateSegs(segment);
+            try {
+                cubeManager.updateCube(cubeBuilder);
+            } catch (IOException e) {
+                return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            }
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to update cube segment offset", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        } else {
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+            cubeBuilder.setToRemoveSegs(segment);
+            try {
+                cubeManager.updateCube(cubeBuilder);
+            } catch (IOException e) {
+                return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            }
+
+            return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes");
         }
+
+
     }
 
 }