You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/09/07 13:20:40 UTC
[05/28] incubator-kylin git commit: KYLIN-984 Behavior change in
streaming data consuming
KYLIN-984 Behavior change in streaming data consuming
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ab2abeec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ab2abeec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ab2abeec
Branch: refs/heads/2.x-staging
Commit: ab2abeec50cc6f924d21da3f8a01dcf5a5c7c650
Parents: c73f5de
Author: honma <ho...@ebay.com>
Authored: Tue Sep 1 14:01:07 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Sep 1 14:14:04 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/job/streaming/BootstrapConfig.java | 12 +++++++++++-
.../apache/kylin/job/streaming/StreamingBootstrap.java | 8 ++++++++
.../org/apache/kylin/job/streaming/StreamingCLI.java | 3 +++
.../java/org/apache/kylin/streaming/StreamFetcher.java | 8 ++------
4 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ab2abeec/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
index ceff3c1..029d4d2 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
@@ -10,10 +10,20 @@ public class BootstrapConfig {
//one off default value set to true
private boolean oneOff = true;
private long start = 0L;
- private long end = 0L;
+ private long end = 0L;
+ private long margin = 0L;
+
private boolean fillGap;
+ public long getMargin() {
+ return margin;
+ }
+
+ public void setMargin(long margin) {
+ this.margin = margin;
+ }
+
public boolean isOneOff() {
return oneOff;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ab2abeec/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 0811451..b7a8335 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -135,6 +135,14 @@ public class StreamingBootstrap {
final String streaming = bootstrapConfig.getStreaming();
Preconditions.checkNotNull(streaming, "streaming name cannot be empty");
final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(streaming);
+
+ if (bootstrapConfig.getMargin() != 0L) {
+ streamingConfig.setMargin(bootstrapConfig.getMargin());
+ logger.info("Margin is overwrite to " + streamingConfig.getMargin());
+ } else {
+ logger.info("Margin is default value: " + streamingConfig.getMargin());
+ }
+
Preconditions.checkArgument(streamingConfig != null, "cannot find kafka config:" + streaming);
if (!StringUtils.isEmpty(streamingConfig.getIiName())) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ab2abeec/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index 78b1e93..8346ec0 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -79,6 +79,9 @@ public class StreamingCLI {
case "-fillGap":
bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
break;
+ case "-margin":
+ bootstrapConfig.setMargin(Long.parseLong(args[++i]));
+ break;
default:
logger.warn("ignore this arg:" + argName);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ab2abeec/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
index f78389e..85d09be 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
@@ -59,14 +59,10 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
microStreamBatch = new MicroStreamBatch(partitionId);
clearCounter();
}
- StreamMessage streamMessage = peek(streamMessageQueue, 30000);
+ StreamMessage streamMessage = peek(streamMessageQueue, 60000);
if (streamMessage == null) {
logger.info("The stream queue is drained, current available stream count: " + microStreamBatch.size());
- if (!microStreamBatch.isEmpty()) {
- return microStreamBatch;
- } else {
- continue;
- }
+ return microStreamBatch;
}
if (streamMessage.getOffset() < 0) {
logger.warn("streaming encountered EOF, stop building");