You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/03/31 13:02:06 UTC

kylin git commit: KYLIN-1500: Split large gap for solving out of memory issue when streaming_fillgap

Repository: kylin
Updated Branches:
  refs/heads/1.4-rc b0388aa98 -> 25a5dcdf5


KYLIN-1500: Split large gap for solving out of memory issue when streaming_fillgap

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/25a5dcdf
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/25a5dcdf
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/25a5dcdf

Branch: refs/heads/1.4-rc
Commit: 25a5dcdf5c5eff509b3041f69af20df52edb1c01
Parents: b0388aa
Author: yangzhong <ya...@ebay.com>
Authored: Thu Mar 31 18:23:59 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Mar 31 19:01:59 2016 +0800

----------------------------------------------------------------------
 .../kylin/engine/streaming/BootstrapConfig.java |  9 +++++++
 .../engine/streaming/cli/StreamingCLI.java      | 26 ++++++++++++++++++--
 2 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/25a5dcdf/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
index 4b51bc0..ea1e46f 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
@@ -29,6 +29,7 @@ public class BootstrapConfig {
     private long end = 0L;
 
     private boolean fillGap;
+    private long maxFillGapRange = 4 * 3600 * 1000L;
 
     public long getStart() {
         return start;
@@ -69,4 +70,12 @@ public class BootstrapConfig {
     public void setFillGap(boolean fillGap) {
         this.fillGap = fillGap;
     }
+
+    public long getMaxFillGapRange() {
+        return maxFillGapRange;
+    }
+
+    public void setMaxFillGapRange(long maxFillGapRange) {
+        this.maxFillGapRange = maxFillGapRange;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/25a5dcdf/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index a73a6ac..ba60893 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -36,6 +36,7 @@ package org.apache.kylin.engine.streaming.cli;
 
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
@@ -78,6 +79,9 @@ public class StreamingCLI {
                 case "-fillGap":
                     bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
                     break;
+                case "-maxFillGapRange":
+                    bootstrapConfig.setMaxFillGapRange(Long.parseLong(args[++i]));
+                    break;
                 default:
                     logger.warn("ignore this arg:" + argName);
                 }
@@ -88,7 +92,12 @@ public class StreamingCLI {
                 final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
                 logger.info("all gaps:" + StringUtils.join(gaps, ","));
                 for (Pair<Long, Long> gap : gaps) {
-                    startOneOffCubeStreaming(bootstrapConfig.getStreaming(), gap.getFirst(), gap.getSecond());
+                    List<Pair<Long, Long>> splitGaps = splitGap(gap, bootstrapConfig.getMaxFillGapRange());
+                    for (Pair<Long, Long> splitGap : splitGaps) {
+                        logger.info("start filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
+                        startOneOffCubeStreaming(bootstrapConfig.getStreaming(), splitGap.getFirst(), splitGap.getSecond());
+                        logger.info("finish filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
+                    }
                 }
             } else {
                 startOneOffCubeStreaming(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
@@ -101,7 +110,20 @@ public class StreamingCLI {
             System.exit(-1);
         }
     }
-    
+
+    private static List<Pair<Long, Long>> splitGap(Pair<Long, Long> gap, long maxFillGapRange) {
+        List<Pair<Long, Long>> gaps = Lists.newArrayList();
+        Long startTime = gap.getFirst();
+
+        while (startTime < gap.getSecond()) {
+            Long endTime = gap.getSecond() <= startTime + maxFillGapRange ? gap.getSecond() : startTime + maxFillGapRange;
+            gaps.add(Pair.newPair(startTime, endTime));
+            startTime = endTime;
+        }
+
+        return gaps;
+    }
+
     private static void startOneOffCubeStreaming(String streaming, long start, long end) {
         final Runnable runnable = new OneOffStreamingBuilder(streaming, start, end).build();
         runnable.run();