You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/09/01 08:34:17 UTC

[4/4] incubator-kylin git commit: KYLIN-984 Behavior change in streaming data consuming

KYLIN-984 Behavior change in streaming data consuming


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ab2abeec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ab2abeec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ab2abeec

Branch: refs/heads/0.8
Commit: ab2abeec50cc6f924d21da3f8a01dcf5a5c7c650
Parents: c73f5de
Author: honma <ho...@ebay.com>
Authored: Tue Sep 1 14:01:07 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Sep 1 14:14:04 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/streaming/BootstrapConfig.java | 12 +++++++++++-
 .../apache/kylin/job/streaming/StreamingBootstrap.java  |  8 ++++++++
 .../org/apache/kylin/job/streaming/StreamingCLI.java    |  3 +++
 .../java/org/apache/kylin/streaming/StreamFetcher.java  |  8 ++------
 4 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ab2abeec/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
index ceff3c1..029d4d2 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
@@ -10,10 +10,20 @@ public class BootstrapConfig {
     //one off default value set to true
     private boolean oneOff = true;
     private long start = 0L;
-    private long end = 0L;
+    private long end = 0L; 
+    private long margin = 0L;
+    
 
     private boolean fillGap;
 
+    public long getMargin() {
+        return margin;
+    }
+
+    public void setMargin(long margin) {
+        this.margin = margin;
+    }
+
     public boolean isOneOff() {
         return oneOff;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ab2abeec/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 0811451..b7a8335 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -135,6 +135,14 @@ public class StreamingBootstrap {
         final String streaming = bootstrapConfig.getStreaming();
         Preconditions.checkNotNull(streaming, "streaming name cannot be empty");
         final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(streaming);
+
+        if (bootstrapConfig.getMargin() != 0L) {
+            streamingConfig.setMargin(bootstrapConfig.getMargin());
+            logger.info("Margin is overwrite to " + streamingConfig.getMargin());
+        } else {
+            logger.info("Margin is default value: " + streamingConfig.getMargin());
+        }
+
         Preconditions.checkArgument(streamingConfig != null, "cannot find kafka config:" + streaming);
 
         if (!StringUtils.isEmpty(streamingConfig.getIiName())) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ab2abeec/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index 78b1e93..8346ec0 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -79,6 +79,9 @@ public class StreamingCLI {
                 case "-fillGap":
                     bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
                     break;
+                case "-margin":
+                    bootstrapConfig.setMargin(Long.parseLong(args[++i]));
+                    break;
                 default:
                     logger.warn("ignore this arg:" + argName);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ab2abeec/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
index f78389e..85d09be 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
@@ -59,14 +59,10 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
                     microStreamBatch = new MicroStreamBatch(partitionId);
                     clearCounter();
                 }
-                StreamMessage streamMessage = peek(streamMessageQueue, 30000);
+                StreamMessage streamMessage = peek(streamMessageQueue, 60000);
                 if (streamMessage == null) {
                     logger.info("The stream queue is drained, current available stream count: " + microStreamBatch.size());
-                    if (!microStreamBatch.isEmpty()) {
-                        return microStreamBatch;
-                    } else {
-                        continue;
-                    }
+                    return microStreamBatch;
                 }
                 if (streamMessage.getOffset() < 0) {
                     logger.warn("streaming encountered EOF, stop building");