You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/04/22 06:11:39 UTC
[10/50] [abbrv] kylin git commit: KYLIN-1500: Split large gap for
solving out of memory issue when streaming_fillgap
KYLIN-1500: Split large gap for solving out of memory issue when streaming_fillgap
Signed-off-by: Hongbin Ma <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9abd421b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9abd421b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9abd421b
Branch: refs/heads/1.5.x-HBase1.1.3
Commit: 9abd421b7627413d5e7e2fafe840497326da51d4
Parents: 099ffdd
Author: yangzhong <ya...@ebay.com>
Authored: Thu Mar 31 18:06:38 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Mar 31 19:02:45 2016 +0800
----------------------------------------------------------------------
.../kylin/engine/streaming/BootstrapConfig.java | 9 +++++++
.../engine/streaming/cli/StreamingCLI.java | 26 ++++++++++++++++++--
2 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/9abd421b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
index e690e9a..35bdfa8 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
@@ -27,6 +27,7 @@ public class BootstrapConfig {
private long end = 0L;
private boolean fillGap;
+ private long maxFillGapRange = 4 * 3600 * 1000L;
public long getStart() {
return start;
@@ -59,4 +60,12 @@ public class BootstrapConfig {
public void setFillGap(boolean fillGap) {
this.fillGap = fillGap;
}
+
+ public long getMaxFillGapRange() {
+ return maxFillGapRange;
+ }
+
+ public void setMaxFillGapRange(long maxFillGapRange) {
+ this.maxFillGapRange = maxFillGapRange;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9abd421b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index 0bab396..487b4b9 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -36,6 +36,7 @@ package org.apache.kylin.engine.streaming.cli;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
@@ -76,6 +77,9 @@ public class StreamingCLI {
case "-fillGap":
bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
break;
+ case "-maxFillGapRange":
+ bootstrapConfig.setMaxFillGapRange(Long.parseLong(args[++i]));
+ break;
default:
logger.warn("ignore this arg:" + argName);
}
@@ -85,7 +89,12 @@ public class StreamingCLI {
final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(bootstrapConfig.getCubeName());
logger.info("all gaps:" + StringUtils.join(gaps, ","));
for (Pair<Long, Long> gap : gaps) {
- startOneOffCubeStreaming(bootstrapConfig.getCubeName(), gap.getFirst(), gap.getSecond());
+ List<Pair<Long, Long>> splitGaps = splitGap(gap, bootstrapConfig.getMaxFillGapRange());
+ for (Pair<Long, Long> splitGap : splitGaps) {
+ logger.info("start filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
+ startOneOffCubeStreaming(bootstrapConfig.getCubeName(), splitGap.getFirst(), splitGap.getSecond());
+ logger.info("finish filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
+ }
}
} else {
startOneOffCubeStreaming(bootstrapConfig.getCubeName(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
@@ -98,7 +107,20 @@ public class StreamingCLI {
System.exit(-1);
}
}
-
+
+ private static List<Pair<Long, Long>> splitGap(Pair<Long, Long> gap, long maxFillGapRange) {
+ List<Pair<Long, Long>> gaps = Lists.newArrayList();
+ Long startTime = gap.getFirst();
+
+ while (startTime < gap.getSecond()) {
+ Long endTime = gap.getSecond() <= startTime + maxFillGapRange ? gap.getSecond() : startTime + maxFillGapRange;
+ gaps.add(Pair.newPair(startTime, endTime));
+ startTime = endTime;
+ }
+
+ return gaps;
+ }
+
private static void startOneOffCubeStreaming(String cubeName, long start, long end) {
final Runnable runnable = new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, end).build();
runnable.run();