You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/14 03:53:30 UTC

[14/15] kylin git commit: KYLIN-1311 Stream cubing auto assignment and load balance

KYLIN-1311 Stream cubing auto assignment and load balance


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/688b762d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/688b762d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/688b762d

Branch: refs/heads/helix-201602
Commit: 688b762dc5ee756261bc42576935029fb9180f11
Parents: c615dcf
Author: shaofengshi <sh...@apache.org>
Authored: Sat Feb 6 11:49:59 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Feb 6 13:33:07 2016 +0800

----------------------------------------------------------------------
 build/bin/streaming_check.sh                    |  13 ++-
 build/bin/streaming_fillgap.sh                  |   1 -
 build/conf/kylin.properties                     |   6 +-
 .../apache/kylin/common/KylinConfigBase.java    |   4 +
 .../kylin/engine/streaming/StreamingConfig.java |  33 ++++++
 .../engine/streaming/cli/StreamingCLI.java      |   2 +-
 .../streaming/monitor/StreamingMonitor.java     |  11 +-
 .../rest/controller/ClusterController.java      |  55 +++++++---
 .../rest/controller/StreamingController.java    |  52 ++++++++-
 .../kylin/rest/helix/HelixClusterAdmin.java     |  69 +++++++++---
 .../helix/StreamCubeBuildTransitionHandler.java | 105 ++++++++++++++-----
 .../rest/request/StreamingBuildRequest.java     |  13 +--
 .../kylin/rest/service/StreamingService.java    |  27 +++--
 13 files changed, 299 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/build/bin/streaming_check.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_check.sh b/build/bin/streaming_check.sh
index fef0139..4c5431a 100644
--- a/build/bin/streaming_check.sh
+++ b/build/bin/streaming_check.sh
@@ -20,10 +20,9 @@
 source /etc/profile
 source ~/.bash_profile
 
-receivers=$1
-host=$2
-tablename=$3
-authorization=$4
-projectname=$5
-cubename=$6
-sh ${KYLIN_HOME}/bin/kylin.sh monitor -receivers ${receivers} -host ${host} -tableName ${tablename} -authorization ${authorization} -cubeName ${cubename} -projectName ${projectname}
\ No newline at end of file
+CUBE_NAME=$1
+AUTHORIZATION=$2
+KYLIN_HOST=$3
+
+cd ${KYLIN_HOME}
+curl --request PUT --header "Authorization: Basic $AUTHORIZATION" --header "Content-Type: application/json" -v ${KYLIN_HOST}/kylin/api/streaming/${CUBE_NAME}/checkgap

http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/build/bin/streaming_fillgap.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh
index 31c4886..fe8c0b5 100644
--- a/build/bin/streaming_fillgap.sh
+++ b/build/bin/streaming_fillgap.sh
@@ -25,5 +25,4 @@ AUTHORIZATION=$2
 KYLIN_HOST=$3
 
 cd ${KYLIN_HOME}
-#sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${streaming} fillgap -streaming ${streaming} -fillGap true -margin ${margin}
 curl --request PUT --header "Authorization: Basic $AUTHORIZATION" --header "Content-Type: application/json" -v ${KYLIN_HOST}/kylin/api/streaming/${CUBE_NAME}/fillgap

http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index b7e9b28..75269de 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -2,12 +2,12 @@
 # Whether this kylin run as an instance of a cluster
 kylin.cluster.enabled=false
 
-# Comma separated list of zk servers; 
-# Optional; if absent, will use HBase zookeeper; set if use a different zk;
+# Comma separated list of zk servers, for cluster coordination; 
+# Optional; if absent, will use HBase zookeeper; set it if use a different zk;
 kylin.zookeeper.address=
 
 # REST address of this instance, need be accessible from other instances;
-# optional, default be <hostname>:7070
+# optional, default be <hostname_fqdn>:<port>
 kylin.rest.address=
 
 # whether run a cluster controller in this instance; a robust cluster need at least 3 controllers.

http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7c127f7..87e4566 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -541,6 +541,10 @@ public class KylinConfigBase implements Serializable {
     public String getClusterName() {
         return this.getOptional("kylin.cluster.name", getMetadataUrlPrefix());
     }
+    
+    public int getClusterMaxPartitionPerRegion() {
+        return Integer.parseInt(getOptional("kylin.cluster.max.partition.per.resource", "100"));
+    }
 
     public void setClusterName(String clusterName) {
         setProperty("kylin.cluster.name", clusterName);

http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
index f0a7ab1..ee9aed8 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
@@ -39,6 +39,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -64,6 +65,14 @@ public class StreamingConfig extends RootPersistentEntity {
     @JsonProperty("cubeName")
     private String cubeName;
 
+    @JsonProperty("partitions")
+    private List<String> partitions;
+
+    @JsonProperty("max_gap")
+    private long maxGap = 30 * 60 * 1000l; // 30 minutes
+    @JsonProperty("max_gap_number")
+    private int maxGapNumber = 10; // 10
+    
     public String getCubeName() {
         return cubeName;
     }
@@ -96,6 +105,30 @@ public class StreamingConfig extends RootPersistentEntity {
         return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + streamingName + ".json";
     }
 
+    public List<String> getPartitions() {
+        return partitions;
+    }
+
+    public void setPartitions(List<String> partitions) {
+        this.partitions = partitions;
+    }
+
+    public long getMaxGap() {
+        return maxGap;
+    }
+
+    public void setMaxGap(long maxGap) {
+        this.maxGap = maxGap;
+    }
+
+    public int getMaxGapNumber() {
+        return maxGapNumber;
+    }
+
+    public void setMaxGapNumber(int maxGapNumber) {
+        this.maxGapNumber = maxGapNumber;
+    }
+
     @Override
     public StreamingConfig clone() {
         try {

http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index 96ad1ad..88f5e18 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -82,7 +82,7 @@ public class StreamingCLI {
             }
             if (bootstrapConfig.isFillGap()) {
                 final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(bootstrapConfig.getStreaming());
-                final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+                final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName(), streamingConfig.getMaxGap());
                 logger.info("all gaps:" + StringUtils.join(gaps, ","));
                 for (Pair<Long, Long> gap : gaps) {
                     startOneOffCubeStreaming(bootstrapConfig.getStreaming(), gap.getFirst(), gap.getSecond());

http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
index 9609442..9d2bd45 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
@@ -74,7 +74,7 @@ public class StreamingMonitor {
         sendMail(receivers, title, stringBuilder.toString());
     }
 
-    public static final List<Pair<Long, Long>> findGaps(String cubeName) {
+    public static final List<Pair<Long, Long>> findGaps(String cubeName, long maxGapAtOnce) {
         List<CubeSegment> segments = getSortedReadySegments(cubeName);
         List<Pair<Long, Long>> gaps = Lists.newArrayList();
         for (int i = 0; i < segments.size() - 1; ++i) {
@@ -83,7 +83,12 @@ public class StreamingMonitor {
             if (first.getDateRangeEnd() == second.getDateRangeStart()) {
                 continue;
             } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
-                gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
+                long start = first.getDateRangeEnd();
+                while (start < second.getDateRangeStart()) {
+                    long end = Math.min(start + maxGapAtOnce, second.getDateRangeStart());
+                    gaps.add(Pair.newPair(start, end));
+                    start = end;
+                }
             }
         }
         return gaps;
@@ -119,7 +124,7 @@ public class StreamingMonitor {
             logger.info("cube:" + cubeName + " does not exist");
             return;
         }
-        List<Pair<Long, Long>> gaps = findGaps(cubeName);
+        List<Pair<Long, Long>> gaps = findGaps(cubeName, Long.MAX_VALUE);
         List<Pair<String, String>> overlaps = Lists.newArrayList();
         StringBuilder content = new StringBuilder();
         if (!gaps.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
index 97fff36..86a0398 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
@@ -19,23 +19,26 @@
 package org.apache.kylin.rest.controller;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.constant.JobStatusEnum;
-import org.apache.kylin.job.constant.JobTimeFilterEnum;
-import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.helix.HelixClusterAdmin;
-import org.apache.kylin.rest.request.JobListRequest;
-import org.apache.kylin.rest.service.JobService;
+import org.apache.kylin.rest.request.StreamingBuildRequest;
+import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.stereotype.Controller;
-import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.ResponseBody;
 
-import java.util.*;
+import java.io.IOException;
+import java.util.Collection;
 
 /**
  * 
@@ -56,15 +59,37 @@ public class ClusterController extends BasicController implements InitializingBe
 
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
 
-        final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig);
-        clusterAdmin.start();
+        if (kylinConfig.isClusterEnabled()) {
+            final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig);
+            clusterAdmin.start();
 
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                clusterAdmin.stop();
+            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    clusterAdmin.stop();
+                }
+            }));
+        } else {
+            String serverMode = kylinConfig.getServerMode();
+            if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) {
+                logger.info("Initializing Job Engine ....");
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            DefaultScheduler scheduler = DefaultScheduler.createInstance();
+                            scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
+                            if (!scheduler.hasStarted()) {
+                                logger.error("scheduler has not been started");
+                                System.exit(1);
+                            }
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }).start();
             }
-        }));
+        }
 
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 209c552..e33a1c9 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -24,9 +24,11 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.exception.ForbiddenException;
 import org.apache.kylin.rest.exception.InternalErrorException;
@@ -249,7 +251,12 @@ public class StreamingController extends BasicController {
         }
 
         streamingBuildRequest.setStreaming(streamingConfig.getName());
-        streamingService.buildStream(cube, streamingBuildRequest);
+        try {
+            streamingService.buildStream(cube, streamingBuildRequest);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return streamingBuildRequest;
+        }
         streamingBuildRequest.setMessage("Build request is submitted successfully.");
         streamingBuildRequest.setSuccessful(true);
         return streamingBuildRequest;
@@ -274,13 +281,52 @@ public class StreamingController extends BasicController {
 
         StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest();
         streamingBuildRequest.setStreaming(streamingConfig.getName());
-        streamingService.fillGap(cube);
-        streamingBuildRequest.setMessage("FillGap request is submitted successfully.");
+        List<Pair<Long, Long>> gaps = null;
+        try {
+            gaps = streamingService.fillGap(cube);
+        } catch (IOException e) {
+            logger.error("", e);
+            return streamingBuildRequest;
+        }
+        streamingBuildRequest.setMessage("FillGap request is submitted successfully, gap number: " + gaps.size());
+        streamingBuildRequest.setSuccessful(true);
+        return streamingBuildRequest;
+
+    }
+
+    /**
+     * check wheter gap exists in a cube
+     *
+     * @param cubeName Cube Name
+     * @return
+     * @throws IOException
+     */
+    @RequestMapping(value = "/{cubeName}/checkgap", method = { RequestMethod.PUT })
+    @ResponseBody
+    public StreamingBuildRequest checkGap(@PathVariable String cubeName) {
+        StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
+        Preconditions.checkNotNull(streamingConfig, "Stream config for '" + cubeName + "' is not found.");
+        List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
+        Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
+        CubeInstance cube = cubes.get(0);
+
+        List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName(), streamingConfig.getMaxGap());
+        logger.info("all gaps:" + StringUtils.join(gaps, ","));
+        
+        StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest();
+        streamingBuildRequest.setStreaming(streamingConfig.getName());
+        if (gaps.size() > 0) {
+            streamingBuildRequest.setMessage(gaps.size() + " gaps in cube: " + StringUtils.join(gaps, ","));
+        } else {
+            streamingBuildRequest.setMessage("No gap.");
+        }
         streamingBuildRequest.setSuccessful(true);
         return streamingBuildRequest;
 
     }
 
+    
+
     public void setStreamingService(StreamingService streamingService) {
         this.streamingService = streamingService;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
index 4da9a86..680e371 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -18,11 +18,9 @@
 package org.apache.kylin.rest.helix;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.helix.*;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.HelixControllerMain;
@@ -32,11 +30,11 @@ import org.apache.helix.tools.StateModelConfigGenerator;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.StreamingBuildRequest;
 import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,7 +66,7 @@ public class HelixClusterAdmin {
 
     private static final Logger logger = LoggerFactory.getLogger(HelixClusterAdmin.class);
     private final String zkAddress;
-    private final ZKHelixAdmin admin;
+    private final HelixAdmin admin;
     private final String clusterName;
 
     private HelixClusterAdmin(KylinConfig kylinConfig) {
@@ -80,7 +78,7 @@ public class HelixClusterAdmin {
             zkAddress = HBaseConnection.getZKConnectString();
             logger.info("no 'kylin.zookeeper.address' in kylin.properties, use HBase zookeeper " + zkAddress);
         }
-        
+
         this.clusterName = kylinConfig.getClusterName();
         this.admin = new ZKHelixAdmin(zkAddress);
     }
@@ -130,24 +128,59 @@ public class HelixClusterAdmin {
 
     }
 
-    public void addStreamingJob(StreamingBuildRequest streamingBuildRequest) {
+    public void addStreamingJob(StreamingBuildRequest streamingBuildRequest) throws IOException {
         String resourceName = streamingBuildRequest.toResourceName();
-        if (admin.getResourcesInCluster(clusterName).contains(resourceName)) {
-            logger.warn("Resource '" + resourceName + "' already exists in cluster, remove and re-add.");
-            admin.dropResource(clusterName, resourceName);
+        if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) {
+            logger.info("Resource '" + resourceName + "' is new, add it with 0 partitions in cluster.");
+            admin.addResource(clusterName, resourceName, 0, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.FULL_AUTO.name());
         }
-        admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.FULL_AUTO.name());
-        rebalanceWithTag(resourceName, TAG_STREAM_BUILDER);
 
+        IdealState idealState = admin.getResourceIdealState(clusterName, resourceName);
+
+        StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming());
+        List<String> partitions = streamingConfig.getPartitions();
+        if (partitions == null) {
+            partitions = Lists.newArrayList();
+        }
+
+        if (partitions.size() != idealState.getNumPartitions() || idealState.getNumPartitions() >= kylinConfig.getClusterMaxPartitionPerRegion()) {
+            if (partitions.size() != idealState.getNumPartitions()) {
+                logger.error("Cluster resource partition number doesn't match with the partitions in StreamingConfig: " + resourceName);
+            } else {
+                logger.error("Partitions number for resource '" + resourceName + " exceeds the up limit: " + kylinConfig.getClusterMaxPartitionPerRegion());
+            }
+            logger.info("Drop and create resource: " + resourceName);
+            cleanResourcePartitions(resourceName);
+            idealState = admin.getResourceIdealState(clusterName, resourceName);
+            streamingConfig.getPartitions().clear();
+            StreamingManager.getInstance(kylinConfig).updateStreamingConfig(streamingConfig);
+            streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming());
+            partitions = Lists.newArrayList();
+        }
+
+        partitions.add(streamingBuildRequest.toPartitionName());
+        streamingConfig.setPartitions(partitions);
+        StreamingManager.getInstance(kylinConfig).updateStreamingConfig(streamingConfig);
+
+        idealState.setNumPartitions(idealState.getNumPartitions() + 1);
+        admin.setResourceIdealState(clusterName, resourceName, idealState);
+        rebalanceWithTag(resourceName, TAG_STREAM_BUILDER);
     }
 
-    public void dropStreamingJob(String streamingName, long start, long end) {
-        String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end;
-        admin.dropResource(clusterName, resourceName);
+
+    private void cleanResourcePartitions(String resourceName) {
+        IdealState is = admin.getResourceIdealState(clusterName, resourceName);
+        is.getRecord().getListFields().clear();
+        is.getRecord().getMapFields().clear();
+        is.setNumPartitions(0);
+        admin.setResourceIdealState(clusterName, resourceName, is);
+
+        logger.info("clean all partitions in resource: " + resourceName);
     }
 
     /**
      * Start the instance and register the state model factory
+     *
      * @param instanceName
      * @throws Exception
      */
@@ -161,11 +194,11 @@ public class HelixClusterAdmin {
 
     /**
      * Rebalance the resource with the tags
+     *
      * @param tags
      */
     protected void rebalanceWithTag(String resourceName, String tag) {
-        List<String> instances = admin.getInstancesInClusterWithTag(clusterName, tag);
-        admin.rebalance(clusterName, resourceName, instances.size(), "", tag);
+        admin.rebalance(clusterName, resourceName, 2, null, tag);
     }
 
     /**
@@ -206,6 +239,7 @@ public class HelixClusterAdmin {
 
     /**
      * Check whether current kylin instance is in the leader role
+     *
      * @return
      */
     public boolean isLeaderRole(String resourceName) {
@@ -220,6 +254,7 @@ public class HelixClusterAdmin {
 
     /**
      * Add instance to cluster, with a tag list
+     *
      * @param instanceName should be unique in format: hostName_port
      * @param tags
      */

http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
index 44d8302..705d8a7 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
@@ -11,6 +11,7 @@ import org.apache.kylin.common.KylinConfigBase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.rest.request.StreamingBuildRequest;
 import org.slf4j.Logger;
@@ -43,43 +44,81 @@ public class StreamCubeBuildTransitionHandler extends TransitionHandler {
     @Transition(to = "LEADER", from = "STANDBY")
     public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
         String resourceName = message.getResourceId().stringify();
-        StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName);
+        final StreamingBuildRequest streamingBuildRequest = getStreamingBuildRequest(resourceName, message.getPartitionName());
+        if (streamingBuildRequest != null && isSuccessfullyBuilt(streamingBuildRequest) == false) {
+            KylinConfigBase.getKylinHome();
+            String segmentId = streamingBuildRequest.toPartitionName();
+            String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingBuildRequest.getStreaming() + " " + segmentId + " -oneoff true -start " + streamingBuildRequest.getStart() + " -end " + streamingBuildRequest.getEnd() + " -streaming " + streamingBuildRequest.getStreaming();
+            runCMD(cmd);
+        }
+    }
 
-        final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming()).getCubeName();
+    @Transition(to = "STANDBY", from = "LEADER")
+    public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+        String resourceName = message.getResourceId().stringify();
+        logger.info("Partition " + message.getPartitionId() + " becomes as Standby");
+        /*
+        final StreamingBuildRequest streamingBuildRequest = getStreamingBuildRequest(resourceName, message.getPartitionName());
+        if (isSuccessfullyBuilt(streamingBuildRequest) == false) {
+            KylinConfigBase.getKylinHome();
+            String segmentId = streamingBuildRequest.toPartitionName();
+            String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming stop " + streamingBuildRequest.getStreaming() + " " + segmentId;
+            runCMD(cmd);
+        }
+        */
+    }
+
+    private boolean isSuccessfullyBuilt(StreamingBuildRequest streamingBuildRequest) {
+        final StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming());
+        final String cubeName = streamingConfig.getCubeName();
         final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
         for (CubeSegment segment : cube.getSegments()) {
             if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
-                logger.info("Segment " + segment.getName() + " already exist, no need rebuild.");
-                return;
+                logger.info("Segment " + segment.getName() + " already exist.");
+                return true;
             }
         }
 
-        KylinConfigBase.getKylinHome();
-        String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd();
-        String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingBuildRequest.getStreaming() + " " + segmentId + " -oneoff true -start " + streamingBuildRequest.getStart() + " -end " + streamingBuildRequest.getEnd() + " -streaming " + streamingBuildRequest.getStreaming();
-        logger.info("Executing: " + cmd);
-        try {
-            String line;
-            Process p = Runtime.getRuntime().exec(cmd);
-            BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
-            while ((line = input.readLine()) != null) {
-                logger.info(line);
+        return false;
+    }
+
+    private StreamingBuildRequest getStreamingBuildRequest(String resourceName, String partitionName) {
+        String streamConfigName = resourceName.substring(HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX.length());
+        int partitionId = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1));
+
+        StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamConfigName);
+
+        int retry = 0;
+        while ((streamingConfig.getPartitions() == null || streamingConfig.getPartitions().isEmpty() || partitionId > (streamingConfig.getPartitions().size() - 1) && retry < 10)) {
+            logger.error("No segment information in StreamingConfig '" + streamConfigName + "' for partition " + partitionId);
+            logger.error("Wait for 0.5 second...");
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                logger.error("", e);
             }
-            input.close();
-        } catch (IOException err) {
-            logger.error("Error happens during build streaming  '" + resourceName + "'", err);
-            throw new RuntimeException(err);
+            streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamConfigName);
+            retry++;
         }
 
+        if (retry >= 10) {
+            logger.error("No segment information in StreamingConfig '" + streamConfigName + "' for partition " + partitionId);
+            logger.warn("Abor building...");
+            return null;
+        }
+
+        String startEnd = streamingConfig.getPartitions().get(partitionId);
+        long start = Long.parseLong(startEnd.substring(0, startEnd.indexOf("_")));
+        long end = Long.parseLong(startEnd.substring(startEnd.indexOf("_") + 1));
+        StreamingBuildRequest request = new StreamingBuildRequest();
+        request.setStreaming(streamConfigName);
+        request.setStart(start);
+        request.setEnd(end);
+        return request;
+
     }
 
-    @Transition(to = "STANDBY", from = "LEADER")
-    public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
-        String resourceName = message.getResourceId().stringify();
-        StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName);
-        KylinConfigBase.getKylinHome();
-        String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd();
-        String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming stop " + streamingBuildRequest.getStreaming() + " " + segmentId;
+    private void runCMD(String cmd) {
         logger.info("Executing: " + cmd);
         try {
             String line;
@@ -90,9 +129,10 @@ public class StreamCubeBuildTransitionHandler extends TransitionHandler {
             }
             input.close();
         } catch (IOException err) {
-            logger.error("Error happens during build streaming  '" + resourceName + "'", err);
+            logger.error("Error happens when running '" + cmd + "'", err);
             throw new RuntimeException(err);
         }
+
     }
 
     @Transition(to = "STANDBY", from = "OFFLINE")
@@ -104,4 +144,17 @@ public class StreamCubeBuildTransitionHandler extends TransitionHandler {
     public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
 
     }
+
+    @Transition(to = "DROPPED", from = "OFFLINE")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+            throws Exception {
+        logger.info("Default OFFLINE->DROPPED transition invoked.");
+    }
+
+    @Transition(to = "OFFLINE", from = "DROPPED")
+    public void onBecomeOfflineFromDropped(Message message, NotificationContext context)
+            throws Exception {
+        logger.info("Default DROPPED->OFFLINE transition invoked.");
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
index dcf91fd..201568e 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
@@ -81,16 +81,9 @@ public class StreamingBuildRequest {
     }
 
     public String toResourceName() {
-        return HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX + streaming + "_" + start + "_" + end;
+        return HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX + streaming;
     }
-
-    public static StreamingBuildRequest fromResourceName(String resourceName) {
-        Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
-        long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1));
-        String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
-        long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1));
-        String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
-
-        return new StreamingBuildRequest(streamingConfig, start, end);
+    public String toPartitionName() {
+        return start + "_" + end;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index 7c2cc48..6e732d9 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.rest.service;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
@@ -98,20 +100,33 @@ public class StreamingService extends BasicService {
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
-    public void buildStream(CubeInstance cube, StreamingBuildRequest streamingBuildRequest) {
+    public void buildStream(CubeInstance cube, StreamingBuildRequest streamingBuildRequest) throws IOException {
         HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv());
-        clusterAdmin.addStreamingJob(streamingBuildRequest);
+        try {
+            clusterAdmin.addStreamingJob(streamingBuildRequest);
+        } catch (IOException e) {
+            logger.error("", e);
+            streamingBuildRequest.setSuccessful(false);
+            streamingBuildRequest.setMessage("Failed to submit job for " + streamingBuildRequest.getStreaming());
+        }
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
-    public void fillGap(CubeInstance cube) {
+    public List<Pair<Long, Long>> fillGap(CubeInstance cube) throws IOException {
         HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv());
         final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfigByCube(cube.getName());
-        final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
-        logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ","));
-        for (Pair<Long, Long> gap : gaps) {
+        final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName(), streamingConfig.getMaxGap());
+        logger.info("all gaps:" + StringUtils.join(gaps, ","));
+
+        List<Pair<Long, Long>> filledGap = Lists.newArrayList();
+        int max_gaps_at_one_time = streamingConfig.getMaxGapNumber();
+        for (int i = 0; i < Math.min(gaps.size(), max_gaps_at_one_time); i++) {
+            Pair<Long, Long> gap = gaps.get(i);
             StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest(streamingConfig.getName(), gap.getFirst(), gap.getSecond());
             clusterAdmin.addStreamingJob(streamingBuildRequest);
+            filledGap.add(gap);
         }
+
+        return filledGap;
     }
 }