You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/03/31 13:02:06 UTC
kylin git commit: KYLIN-1500: Split large gap for solving out of
memory issue when streaming_fillgap
Repository: kylin
Updated Branches:
refs/heads/1.4-rc b0388aa98 -> 25a5dcdf5
KYLIN-1500: Split large gap for solving out of memory issue when streaming_fillgap
Signed-off-by: Hongbin Ma <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/25a5dcdf
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/25a5dcdf
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/25a5dcdf
Branch: refs/heads/1.4-rc
Commit: 25a5dcdf5c5eff509b3041f69af20df52edb1c01
Parents: b0388aa
Author: yangzhong <ya...@ebay.com>
Authored: Thu Mar 31 18:23:59 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Mar 31 19:01:59 2016 +0800
----------------------------------------------------------------------
.../kylin/engine/streaming/BootstrapConfig.java | 9 +++++++
.../engine/streaming/cli/StreamingCLI.java | 26 ++++++++++++++++++--
2 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/25a5dcdf/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
index 4b51bc0..ea1e46f 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
@@ -29,6 +29,7 @@ public class BootstrapConfig {
private long end = 0L;
private boolean fillGap;
+ private long maxFillGapRange = 4 * 3600 * 1000L;
public long getStart() {
return start;
@@ -69,4 +70,12 @@ public class BootstrapConfig {
public void setFillGap(boolean fillGap) {
this.fillGap = fillGap;
}
+
+ public long getMaxFillGapRange() {
+ return maxFillGapRange;
+ }
+
+ public void setMaxFillGapRange(long maxFillGapRange) {
+ this.maxFillGapRange = maxFillGapRange;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/25a5dcdf/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index a73a6ac..ba60893 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -36,6 +36,7 @@ package org.apache.kylin.engine.streaming.cli;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
@@ -78,6 +79,9 @@ public class StreamingCLI {
case "-fillGap":
bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
break;
+ case "-maxFillGapRange":
+ bootstrapConfig.setMaxFillGapRange(Long.parseLong(args[++i]));
+ break;
default:
logger.warn("ignore this arg:" + argName);
}
@@ -88,7 +92,12 @@ public class StreamingCLI {
final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
logger.info("all gaps:" + StringUtils.join(gaps, ","));
for (Pair<Long, Long> gap : gaps) {
- startOneOffCubeStreaming(bootstrapConfig.getStreaming(), gap.getFirst(), gap.getSecond());
+ List<Pair<Long, Long>> splitGaps = splitGap(gap, bootstrapConfig.getMaxFillGapRange());
+ for (Pair<Long, Long> splitGap : splitGaps) {
+ logger.info("start filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
+ startOneOffCubeStreaming(bootstrapConfig.getStreaming(), splitGap.getFirst(), splitGap.getSecond());
+ logger.info("finish filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
+ }
}
} else {
startOneOffCubeStreaming(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
@@ -101,7 +110,20 @@ public class StreamingCLI {
System.exit(-1);
}
}
-
+
+ private static List<Pair<Long, Long>> splitGap(Pair<Long, Long> gap, long maxFillGapRange) {
+ List<Pair<Long, Long>> gaps = Lists.newArrayList();
+ Long startTime = gap.getFirst();
+
+ while (startTime < gap.getSecond()) {
+ Long endTime = gap.getSecond() <= startTime + maxFillGapRange ? gap.getSecond() : startTime + maxFillGapRange;
+ gaps.add(Pair.newPair(startTime, endTime));
+ startTime = endTime;
+ }
+
+ return gaps;
+ }
+
private static void startOneOffCubeStreaming(String streaming, long start, long end) {
final Runnable runnable = new OneOffStreamingBuilder(streaming, start, end).build();
runnable.run();