You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/03 04:30:58 UTC

[11/25] kylin git commit: KYLIN-1311 Stream cubing auto assignment and load balance

KYLIN-1311 Stream cubing auto assignment and load balance


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9274aa81
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9274aa81
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9274aa81

Branch: refs/heads/helix-rebase
Commit: 9274aa815252455bbec2ad77cf9456b2351c439c
Parents: 3323b11
Author: shaofengshi <sh...@apache.org>
Authored: Wed Jan 13 12:00:48 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Mar 2 17:24:11 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/rest/constant/Constant.java    |  1 +
 .../kylin/rest/helix/HelixClusterAdmin.java     | 22 +++++++--
 .../helix/LeaderStandbyStateModelFactory.java   | 50 +++++++++++++++++++-
 3 files changed, 68 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/9274aa81/server/src/main/java/org/apache/kylin/rest/constant/Constant.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/constant/Constant.java b/server/src/main/java/org/apache/kylin/rest/constant/Constant.java
index f068e5f..58b74f0 100644
--- a/server/src/main/java/org/apache/kylin/rest/constant/Constant.java
+++ b/server/src/main/java/org/apache/kylin/rest/constant/Constant.java
@@ -41,6 +41,7 @@ public class Constant {
 
     public final static String SERVER_MODE_QUERY = "query";
     public final static String SERVER_MODE_JOB = "job";
+    public final static String SERVER_MODE_STREAM = "stream";
     public final static String SERVER_MODE_ALL = "all";
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9274aa81/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
index 9983aae..6300383 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -45,10 +45,12 @@ import java.util.concurrent.ConcurrentMap;
 public class HelixClusterAdmin {
 
     public static final String RESOURCE_NAME_JOB_ENGINE = "Resource_JobEngine";
+    public static final String RESOURCE_STREAME_CUBE_PREFIX = "Resource_Streame_";
 
     public static final String MODEL_LEADER_STANDBY = "LeaderStandby";
     public static final String MODEL_ONLINE_OFFLINE = "OnlineOffline";
     public static final String TAG_JOB_ENGINE = "Tag_JobEngine";
+    public static final String TAG_STREAM_BUILDER = "Tag_StreamBuilder";
 
     private static ConcurrentMap<KylinConfig, HelixClusterAdmin> instanceMaps = Maps.newConcurrentMap();
     private HelixManager participantManager;
@@ -74,11 +76,15 @@ public class HelixClusterAdmin {
 
         // use the tag to mark node's role.
         final List<String> instanceTags = Lists.newArrayList();
-        final boolean runJobEngine = Constant.SERVER_MODE_ALL.equalsIgnoreCase(kylinConfig.getServerMode()) || Constant.SERVER_MODE_JOB.equalsIgnoreCase(kylinConfig.getServerMode());
-        if (runJobEngine) {
+        if (Constant.SERVER_MODE_ALL.equalsIgnoreCase(kylinConfig.getServerMode())) {
             instanceTags.add(HelixClusterAdmin.TAG_JOB_ENGINE);
+            instanceTags.add(HelixClusterAdmin.TAG_STREAM_BUILDER);
+        } else if (Constant.SERVER_MODE_JOB.equalsIgnoreCase(kylinConfig.getServerMode())) {
+            instanceTags.add(HelixClusterAdmin.TAG_JOB_ENGINE);
+        } else if (Constant.SERVER_MODE_STREAM.equalsIgnoreCase(kylinConfig.getServerMode())) {
+            instanceTags.add(HelixClusterAdmin.TAG_STREAM_BUILDER);
         }
-
+        
         addInstance(instanceName, instanceTags);
         startInstance(instanceName);
 
@@ -108,6 +114,16 @@ public class HelixClusterAdmin {
         }
 
     }
+    
+    public void addStreamCubeSlice(String cubeName, long start, long end) {
+        String resourceName = RESOURCE_STREAME_CUBE_PREFIX + cubeName + "_" + start + "_" + end;
+        if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) {
+            admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
+        }
+
+        admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER);
+        
+    }
 
     /**
      * Start the instance and register the state model factory

http://git-wip-us.apache.org/repos/asf/kylin/blob/9274aa81/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
index 6694c81..c2a78e7 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.rest.helix;
 
+import com.google.common.base.Preconditions;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.api.StateTransitionHandlerFactory;
 import org.apache.helix.api.TransitionHandler;
@@ -8,12 +9,16 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
+import org.apache.kylin.engine.streaming.cli.StreamingCLI;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.lock.MockJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX;
+
 /**
  */
 public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> {
@@ -22,13 +27,19 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
     @Override
     public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
         if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) {
-            return new JobEngineStateModel();
+            return JobEngineStateModel.INSTANCE;
         }
-        
+
+        if (partitionId.getResourceId().stringify().startsWith(RESOURCE_STREAME_CUBE_PREFIX)) {
+            return StreamCubeStateModel.INSTANCE;
+        }
+
         return null;
     }
 
     public static class JobEngineStateModel extends TransitionHandler {
+        
+        public static JobEngineStateModel INSTANCE = new JobEngineStateModel();
 
         @Transition(to = "LEADER", from = "STANDBY")
         public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
@@ -67,4 +78,39 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
 
         }
     }
+
+    public static class StreamCubeStateModel extends TransitionHandler {
+        
+        public static StreamCubeStateModel INSTANCE = new StreamCubeStateModel();
+
+        @Transition(to = "LEADER", from = "STANDBY")
+        public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+            String resourceName = message.getResourceId().stringify();
+            Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
+            long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_")) + 1);
+            String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
+            long start = Long.parseLong(temp.substring(temp.lastIndexOf("_")) + 1);
+            String cubeName = temp.substring(0, temp.lastIndexOf("_"));
+
+            final Runnable runnable = new OneOffStreamingBuilder(cubeName, start, end).build();
+            runnable.run();
+        }
+
+        @Transition(to = "STANDBY", from = "LEADER")
+        public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+           
+
+        }
+
+        @Transition(to = "STANDBY", from = "OFFLINE")
+        public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+           
+        }
+
+
+        @Transition(to = "OFFLINE", from = "STANDBY")
+        public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+           
+        }
+    }
 }