You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/09 05:16:34 UTC

incubator-kylin git commit: KYLIN-802 patch for ii streaming build's last slice

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 d853c7a31 -> 43123e614


KYLIN-802 patch for ii streaming build's last slice


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/43123e61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/43123e61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/43123e61

Branch: refs/heads/0.8.0
Commit: 43123e61495a4d6f30359e2febfaefbd52e686dd
Parents: d853c7a
Author: honma <ho...@ebay.com>
Authored: Tue Jun 9 11:15:43 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 9 11:15:43 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/PeriodicalStreamBuilderTest.java     | 2 +-
 .../main/java/org/apache/kylin/streaming/BatchCondition.java | 4 +---
 .../org/apache/kylin/streaming/LimitedSizeCondition.java     | 8 ++++++--
 .../main/java/org/apache/kylin/streaming/StreamFetcher.java  | 4 ++++
 4 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/43123e61/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
index 0eec947..d53d419 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
@@ -95,7 +95,7 @@ public class PeriodicalStreamBuilderTest extends LocalFileMetadataTestCase {
         logger.info("prepare to add StreamMessage");
         while (true) {
             long ts = System.currentTimeMillis();
-            if (ts > timeout + interval) {
+            if (ts >= timeout + interval) {
                 break;
             }
             if (ts >= nextPeriodStart && ts < timeout) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/43123e61/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
index 4979039..54f966c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
@@ -5,9 +5,7 @@ package org.apache.kylin.streaming;
 public interface BatchCondition {
 
     enum Result {
-        ACCEPT,
-        REJECT,
-        DISCARD
+        ACCEPT, REJECT, DISCARD,LAST_ACCEPT_FOR_BATCH
     }
 
     Result apply(ParsedStreamMessage message);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/43123e61/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
index c99c75b..e5f0fc4 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
@@ -16,10 +16,14 @@ public class LimitedSizeCondition implements BatchCondition {
     public Result apply(ParsedStreamMessage message) {
         if (count < limit) {
             count++;
-            return Result.ACCEPT;
+
+            if (count == limit) {
+                return Result.LAST_ACCEPT_FOR_BATCH;
+            } else {
+                return Result.ACCEPT;
+            }
         } else {
             return Result.REJECT;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/43123e61/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
index 4f29610..afe6d79 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
@@ -84,6 +84,10 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
                     if (result == BatchCondition.Result.ACCEPT) {
                         streamMessageQueue.take();
                         microStreamBatch.add(parsedStreamMessage);
+                    } else if (result == BatchCondition.Result.LAST_ACCEPT_FOR_BATCH) {
+                        streamMessageQueue.take();
+                        microStreamBatch.add(parsedStreamMessage);
+                        return microStreamBatch;
                     } else if (result == BatchCondition.Result.DISCARD) {
                         streamMessageQueue.take();
                     } else if (result == BatchCondition.Result.REJECT) {