You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/04 03:03:11 UTC

[30/43] kylin git commit: KYLIN-1311 Stream cubing auto assignment and load balance

KYLIN-1311 Stream cubing auto assignment and load balance


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ab60480f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ab60480f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ab60480f

Branch: refs/heads/helix-rebase
Commit: ab60480f244a106a4e3179590956a04246e9f4db
Parents: 55558551
Author: shaofengshi <sh...@apache.org>
Authored: Thu Jan 14 14:59:54 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Mar 4 09:52:19 2016 +0800

----------------------------------------------------------------------
 .../kylin/engine/streaming/BootstrapConfig.java |  8 --
 .../engine/streaming/cli/StreamingCLI.java      |  3 -
 .../kylin/rest/controller/CubeController.java   |  5 ++
 .../rest/controller/StreamingController.java    | 50 +++++++++++++
 .../kylin/rest/helix/HelixClusterAdmin.java     | 13 +++-
 .../helix/LeaderStandbyStateModelFactory.java   | 43 +++++++----
 .../rest/request/StreamingBuildRequest.java     | 77 ++++++++++++++++++++
 .../kylin/rest/request/StreamingRequest.java    |  4 +-
 .../kylin/rest/service/StreamingService.java    | 27 +++++++
 9 files changed, 201 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
index a3e2db5..2b83b84 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
@@ -36,14 +36,6 @@ public class BootstrapConfig {
         this.streaming = streaming;
     }
 
-    public int getPartitionId() {
-        return partitionId;
-    }
-
-    public void setPartitionId(int partitionId) {
-        this.partitionId = partitionId;
-    }
-
     public boolean isFillGap() {
         return fillGap;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index a73a6ac..96ad1ad 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -72,9 +72,6 @@ public class StreamingCLI {
                 case "-streaming":
                     bootstrapConfig.setStreaming(args[++i]);
                     break;
-                case "-partition":
-                    bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
-                    break;
                 case "-fillGap":
                     bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
                     break;

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 9afa750..4ab640f 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -27,14 +27,19 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.streaming.BootstrapConfig;
 import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.exception.JobException;

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index e22bd30..57831d5 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -21,14 +21,23 @@ package org.apache.kylin.rest.controller;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonMappingException;
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.engine.streaming.BootstrapConfig;
 import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.exception.ForbiddenException;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.exception.NotFoundException;
+import org.apache.kylin.rest.request.StreamingBuildRequest;
 import org.apache.kylin.rest.request.StreamingRequest;
+import org.apache.kylin.rest.service.CubeService;
 import org.apache.kylin.rest.service.KafkaConfigService;
 import org.apache.kylin.rest.service.StreamingService;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -36,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.access.AccessDeniedException;
+import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.*;
 
@@ -58,6 +68,9 @@ public class StreamingController extends BasicController {
     @Autowired
     private KafkaConfigService kafkaConfigService;
 
+    @Autowired
+    private CubeService cubeService;
+
     @RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
     @ResponseBody
     public List<StreamingConfig> getStreamings(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
@@ -214,6 +227,43 @@ public class StreamingController extends BasicController {
         request.setMessage(message);
     }
 
+
+
+    /**
+     * Send a stream build request
+     *
+     * @param cubeName Cube ID
+     * @return
+     * @throws IOException
+     */
+    @RequestMapping(value = "/{streamingName}/build", method = {RequestMethod.PUT})
+    @ResponseBody
+    public StreamingBuildRequest buildStream(@PathVariable String streamingName, @RequestBody StreamingBuildRequest streamingBuildRequest) {
+        streamingBuildRequest.setStreaming(streamingName);
+        StreamingConfig streamingConfig = streamingService.getStreamingManager().getConfig(streamingName);
+        Preconditions.checkNotNull(streamingConfig, "Stream config '" + streamingName + "' is not found.");
+        String cubeName = streamingConfig.getCubeName();
+        List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
+        Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
+        CubeInstance cube = cubes.get(0);
+        if (streamingBuildRequest.isFillGap() == false) {
+            Preconditions.checkArgument(streamingBuildRequest.getEnd() > streamingBuildRequest.getStart(), "End time should be greater than start time.");
+            for (CubeSegment segment : cube.getSegments()) {
+                if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
+                    streamingBuildRequest.setMessage("The segment already exists: " + segment.toString());
+                    streamingBuildRequest.setSuccessful(false);
+                    return streamingBuildRequest;
+                }
+            }
+        }
+
+        streamingService.buildStream(streamingName, streamingBuildRequest);
+        streamingBuildRequest.setMessage("Build request is submitted successfully.");
+        streamingBuildRequest.setSuccessful(true);
+        return streamingBuildRequest;
+
+    }
+
     public void setStreamingService(StreamingService streamingService) {
         this.streamingService= streamingService;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
index 6300383..f62204d 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -45,7 +45,7 @@ import java.util.concurrent.ConcurrentMap;
 public class HelixClusterAdmin {
 
     public static final String RESOURCE_NAME_JOB_ENGINE = "Resource_JobEngine";
-    public static final String RESOURCE_STREAME_CUBE_PREFIX = "Resource_Streame_";
+    public static final String RESOURCE_STREAME_CUBE_PREFIX = "Resource_Stream_";
 
     public static final String MODEL_LEADER_STANDBY = "LeaderStandby";
     public static final String MODEL_ONLINE_OFFLINE = "OnlineOffline";
@@ -115,15 +115,22 @@ public class HelixClusterAdmin {
 
     }
     
-    public void addStreamCubeSlice(String cubeName, long start, long end) {
-        String resourceName = RESOURCE_STREAME_CUBE_PREFIX + cubeName + "_" + start + "_" + end;
+    public void addStreamingJob(String streamingName, long start, long end) {
+        String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end;
         if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) {
             admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
+        } else {
+            logger.warn("Resource '" + resourceName + "' already exists in cluster, skip adding.");
         }
 
         admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER);
         
     }
+    
+    public void dropStreamingJob(String streamingName, long start, long end) {
+        String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end;
+        admin.dropResource(clusterName, resourceName);
+    }
 
     /**
      * Start the instance and register the state model factory

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
index c2a78e7..df23ea0 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -9,21 +9,24 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
-import org.apache.kylin.engine.streaming.cli.StreamingCLI;
+import org.apache.kylin.common.KylinConfigBase;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.lock.MockJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
 import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX;
 
 /**
  */
 public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> {
     private static final Logger logger = LoggerFactory.getLogger(LeaderStandbyStateModelFactory.class);
-    
+
     @Override
     public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
         if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) {
@@ -38,7 +41,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
     }
 
     public static class JobEngineStateModel extends TransitionHandler {
-        
+
         public static JobEngineStateModel INSTANCE = new JobEngineStateModel();
 
         @Transition(to = "LEADER", from = "STANDBY")
@@ -62,7 +65,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
         public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
             logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()");
             DefaultScheduler.destroyInstance();
-            
+
         }
 
         @Transition(to = "STANDBY", from = "OFFLINE")
@@ -71,7 +74,6 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
 
         }
 
-
         @Transition(to = "OFFLINE", from = "STANDBY")
         public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
             logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()");
@@ -80,7 +82,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
     }
 
     public static class StreamCubeStateModel extends TransitionHandler {
-        
+
         public static StreamCubeStateModel INSTANCE = new StreamCubeStateModel();
 
         @Transition(to = "LEADER", from = "STANDBY")
@@ -90,27 +92,40 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
             long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_")) + 1);
             String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
             long start = Long.parseLong(temp.substring(temp.lastIndexOf("_")) + 1);
-            String cubeName = temp.substring(0, temp.lastIndexOf("_"));
+            String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
+
+            KylinConfigBase.getKylinHome();
+            String segmentId = start + "_" + end;
+            String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingConfig + " " + segmentId + " -oneoff true -start " + start + " -end " + end + " -streaming " + streamingConfig;
+            logger.info("Executing: " + cmd);
+            try {
+                String line;
+                Process p = Runtime.getRuntime().exec(cmd);
+                BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
+                while ((line = input.readLine()) != null) {
+                    logger.info(line);
+                }
+                input.close();
+            } catch (IOException err) {
+                logger.error("Error happens during build streaming  '" + resourceName + "'", err);
+                throw new RuntimeException(err);
+            }
 
-            final Runnable runnable = new OneOffStreamingBuilder(cubeName, start, end).build();
-            runnable.run();
         }
 
         @Transition(to = "STANDBY", from = "LEADER")
         public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
-           
 
         }
 
         @Transition(to = "STANDBY", from = "OFFLINE")
         public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
-           
-        }
 
+        }
 
         @Transition(to = "OFFLINE", from = "STANDBY")
         public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
-           
+
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
new file mode 100644
index 0000000..e06a06c
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.request;
+
+public class StreamingBuildRequest {
+
+    private String streaming;
+    private long start;
+    private long end;
+    private boolean fillGap;
+    private String message;
+    private boolean successful;
+
+    public String getStreaming() {
+        return streaming;
+    }
+
+    public void setStreaming(String streaming) {
+        this.streaming = streaming;
+    }
+
+    public boolean isSuccessful() {
+        return successful;
+    }
+
+    public void setSuccessful(boolean successful) {
+        this.successful = successful;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public long getStart() {
+        return start;
+    }
+
+    public void setStart(long start) {
+        this.start = start;
+    }
+
+    public long getEnd() {
+        return end;
+    }
+
+    public void setEnd(long end) {
+        this.end = end;
+    }
+
+    public boolean isFillGap() {
+        return fillGap;
+    }
+
+    public void setFillGap(boolean fillGap) {
+        this.fillGap = fillGap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
index 07c30f3..b737c3e 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
@@ -19,7 +19,9 @@
 
 package org.apache.kylin.rest.request;
 
-import java.lang.String;public class StreamingRequest {
+import java.lang.String;
+
+public class StreamingRequest {
 
     private String project;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index e40426b..da20949 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -18,12 +18,22 @@
 
 package org.apache.kylin.rest.service;
 
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.streaming.BootstrapConfig;
 import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
+import org.apache.kylin.rest.request.StreamingBuildRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.access.prepost.PostFilter;
+import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
@@ -33,6 +43,7 @@ import java.util.List;
 @Component("streamingMgmtService")
 public class StreamingService extends BasicService {
 
+    private static final Logger logger = LoggerFactory.getLogger(StreamingService.class);
     @Autowired
     private AccessService accessService;
 
@@ -87,4 +98,20 @@ public class StreamingService extends BasicService {
         getStreamingManager().removeStreamingConfig(config);
     }
 
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
+    public void buildStream(String cube, StreamingBuildRequest streamingBuildRequest) {
+        HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv());
+        if (streamingBuildRequest.isFillGap()) {
+            final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingBuildRequest.getStreaming());
+            final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+            logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ","));
+            for (Pair<Long, Long> gap : gaps) {
+                clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), gap.getFirst(), gap.getSecond());
+            }
+        } else {
+            clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), streamingBuildRequest.getStart(), streamingBuildRequest.getEnd());
+        }
+    }
+
 }