You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/09 05:16:34 UTC
incubator-kylin git commit: KYLIN-802 patch for ii streaming build's
last slice
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 d853c7a31 -> 43123e614
KYLIN-802 patch for ii streaming build's last slice
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/43123e61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/43123e61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/43123e61
Branch: refs/heads/0.8.0
Commit: 43123e61495a4d6f30359e2febfaefbd52e686dd
Parents: d853c7a
Author: honma <ho...@ebay.com>
Authored: Tue Jun 9 11:15:43 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 9 11:15:43 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/PeriodicalStreamBuilderTest.java | 2 +-
.../main/java/org/apache/kylin/streaming/BatchCondition.java | 4 +---
.../org/apache/kylin/streaming/LimitedSizeCondition.java | 8 ++++++--
.../main/java/org/apache/kylin/streaming/StreamFetcher.java | 4 ++++
4 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/43123e61/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
index 0eec947..d53d419 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
@@ -95,7 +95,7 @@ public class PeriodicalStreamBuilderTest extends LocalFileMetadataTestCase {
logger.info("prepare to add StreamMessage");
while (true) {
long ts = System.currentTimeMillis();
- if (ts > timeout + interval) {
+ if (ts >= timeout + interval) {
break;
}
if (ts >= nextPeriodStart && ts < timeout) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/43123e61/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
index 4979039..54f966c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
@@ -5,9 +5,7 @@ package org.apache.kylin.streaming;
public interface BatchCondition {
enum Result {
- ACCEPT,
- REJECT,
- DISCARD
+ ACCEPT, REJECT, DISCARD,LAST_ACCEPT_FOR_BATCH
}
Result apply(ParsedStreamMessage message);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/43123e61/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
index c99c75b..e5f0fc4 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
@@ -16,10 +16,14 @@ public class LimitedSizeCondition implements BatchCondition {
public Result apply(ParsedStreamMessage message) {
if (count < limit) {
count++;
- return Result.ACCEPT;
+
+ if (count == limit) {
+ return Result.LAST_ACCEPT_FOR_BATCH;
+ } else {
+ return Result.ACCEPT;
+ }
} else {
return Result.REJECT;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/43123e61/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
index 4f29610..afe6d79 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
@@ -84,6 +84,10 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
if (result == BatchCondition.Result.ACCEPT) {
streamMessageQueue.take();
microStreamBatch.add(parsedStreamMessage);
+ } else if (result == BatchCondition.Result.LAST_ACCEPT_FOR_BATCH) {
+ streamMessageQueue.take();
+ microStreamBatch.add(parsedStreamMessage);
+ return microStreamBatch;
} else if (result == BatchCondition.Result.DISCARD) {
streamMessageQueue.take();
} else if (result == BatchCondition.Result.REJECT) {