You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/14 03:53:25 UTC

[09/15] kylin git commit: KYLIN-1311 fix small bug

KYLIN-1311 fix small bug


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bbfe8ae8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bbfe8ae8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bbfe8ae8

Branch: refs/heads/helix-201602
Commit: bbfe8ae84769ecae7de314fb6a8d3c700b654ee9
Parents: 1aaa267
Author: shaofengshi <sh...@apache.org>
Authored: Fri Jan 15 17:57:26 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Feb 6 13:33:06 2016 +0800

----------------------------------------------------------------------
 .../engine/streaming/StreamingManager.java      | 11 +++++-----
 .../rest/controller/StreamingController.java    | 13 ++++++------
 .../helix/LeaderStandbyStateModelFactory.java   | 21 +++++++++++++++++---
 3 files changed, 30 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/bbfe8ae8/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index af04a11..798fc3f 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -137,6 +137,12 @@ public class StreamingManager {
         return streamingMap.get(name);
     }
 
+    public StreamingConfig getStreamingConfigByCube(String cubeName) {
+        String streamingConfig = cubeName + "_streaming";
+        return getStreamingConfig(streamingConfig);
+    }
+
+
     public List<StreamingConfig> listAllStreaming() {
         return new ArrayList<>(streamingMap.values());
     }
@@ -168,11 +174,6 @@ public class StreamingManager {
         streamingMap.remove(streamingConfig.getName());
     }
 
-    public StreamingConfig getConfig(String name) {
-        name = name.toUpperCase();
-        return streamingMap.get(name);
-    }
-
     public void removeStreamingLocal(String streamingName) {
         streamingMap.removeLocal(streamingName);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/bbfe8ae8/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 57831d5..fb806d1 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -236,13 +236,11 @@ public class StreamingController extends BasicController {
      * @return
      * @throws IOException
      */
-    @RequestMapping(value = "/{streamingName}/build", method = {RequestMethod.PUT})
+    @RequestMapping(value = "/{cubeName}/build", method = {RequestMethod.PUT})
     @ResponseBody
-    public StreamingBuildRequest buildStream(@PathVariable String streamingName, @RequestBody StreamingBuildRequest streamingBuildRequest) {
-        streamingBuildRequest.setStreaming(streamingName);
-        StreamingConfig streamingConfig = streamingService.getStreamingManager().getConfig(streamingName);
-        Preconditions.checkNotNull(streamingConfig, "Stream config '" + streamingName + "' is not found.");
-        String cubeName = streamingConfig.getCubeName();
+    public StreamingBuildRequest buildStream(@PathVariable String cubeName, @RequestBody StreamingBuildRequest streamingBuildRequest) {
+        StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
+        Preconditions.checkNotNull(streamingConfig, "Stream config for '" + cubeName + "' is not found.");
         List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
         Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
         CubeInstance cube = cubes.get(0);
@@ -257,7 +255,8 @@ public class StreamingController extends BasicController {
             }
         }
 
-        streamingService.buildStream(streamingName, streamingBuildRequest);
+        streamingBuildRequest.setStreaming(streamingConfig.getName());
+        streamingService.buildStream(cubeName, streamingBuildRequest);
         streamingBuildRequest.setMessage("Build request is submitted successfully.");
         streamingBuildRequest.setSuccessful(true);
         return streamingBuildRequest;

http://git-wip-us.apache.org/repos/asf/kylin/blob/bbfe8ae8/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
index df23ea0..8614e8c 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -10,6 +10,10 @@ import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.lock.MockJobLock;
@@ -48,7 +52,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
         public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
             logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
             try {
-                KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+                final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
                 DefaultScheduler scheduler = DefaultScheduler.createInstance();
                 scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
                 while (!scheduler.hasStarted()) {
@@ -89,11 +93,22 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
         public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
             String resourceName = message.getResourceId().stringify();
             Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
-            long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_")) + 1);
+            long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1));
             String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
-            long start = Long.parseLong(temp.substring(temp.lastIndexOf("_")) + 1);
+            long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1));
             String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
 
+            final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            
+            final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingConfig).getCubeName();
+            final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+            for (CubeSegment segment : cube.getSegments()) {
+                if (segment.getDateRangeStart() <= start && segment.getDateRangeEnd() >= end) {
+                    logger.info("Segment " + segment.getName() + " already exist, no need rebuild.");
+                    return;
+                }
+            }
+            
             KylinConfigBase.getKylinHome();
             String segmentId = start + "_" + end;
             String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingConfig + " " + segmentId + " -oneoff true -start " + start + " -end " + end + " -streaming " + streamingConfig;