You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/03 04:30:58 UTC
[11/25] kylin git commit: KYLIN-1311 Stream cubing auto assignment
and load balance
KYLIN-1311 Stream cubing auto assignment and load balance
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9274aa81
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9274aa81
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9274aa81
Branch: refs/heads/helix-rebase
Commit: 9274aa815252455bbec2ad77cf9456b2351c439c
Parents: 3323b11
Author: shaofengshi <sh...@apache.org>
Authored: Wed Jan 13 12:00:48 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Mar 2 17:24:11 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/rest/constant/Constant.java | 1 +
.../kylin/rest/helix/HelixClusterAdmin.java | 22 +++++++--
.../helix/LeaderStandbyStateModelFactory.java | 50 +++++++++++++++++++-
3 files changed, 68 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/9274aa81/server/src/main/java/org/apache/kylin/rest/constant/Constant.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/constant/Constant.java b/server/src/main/java/org/apache/kylin/rest/constant/Constant.java
index f068e5f..58b74f0 100644
--- a/server/src/main/java/org/apache/kylin/rest/constant/Constant.java
+++ b/server/src/main/java/org/apache/kylin/rest/constant/Constant.java
@@ -41,6 +41,7 @@ public class Constant {
public final static String SERVER_MODE_QUERY = "query";
public final static String SERVER_MODE_JOB = "job";
+ public final static String SERVER_MODE_STREAM = "stream";
public final static String SERVER_MODE_ALL = "all";
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9274aa81/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
index 9983aae..6300383 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -45,10 +45,12 @@ import java.util.concurrent.ConcurrentMap;
public class HelixClusterAdmin {
public static final String RESOURCE_NAME_JOB_ENGINE = "Resource_JobEngine";
+ public static final String RESOURCE_STREAME_CUBE_PREFIX = "Resource_Streame_";
public static final String MODEL_LEADER_STANDBY = "LeaderStandby";
public static final String MODEL_ONLINE_OFFLINE = "OnlineOffline";
public static final String TAG_JOB_ENGINE = "Tag_JobEngine";
+ public static final String TAG_STREAM_BUILDER = "Tag_StreamBuilder";
private static ConcurrentMap<KylinConfig, HelixClusterAdmin> instanceMaps = Maps.newConcurrentMap();
private HelixManager participantManager;
@@ -74,11 +76,15 @@ public class HelixClusterAdmin {
// use the tag to mark node's role.
final List<String> instanceTags = Lists.newArrayList();
- final boolean runJobEngine = Constant.SERVER_MODE_ALL.equalsIgnoreCase(kylinConfig.getServerMode()) || Constant.SERVER_MODE_JOB.equalsIgnoreCase(kylinConfig.getServerMode());
- if (runJobEngine) {
+ if (Constant.SERVER_MODE_ALL.equalsIgnoreCase(kylinConfig.getServerMode())) {
instanceTags.add(HelixClusterAdmin.TAG_JOB_ENGINE);
+ instanceTags.add(HelixClusterAdmin.TAG_STREAM_BUILDER);
+ } else if (Constant.SERVER_MODE_JOB.equalsIgnoreCase(kylinConfig.getServerMode())) {
+ instanceTags.add(HelixClusterAdmin.TAG_JOB_ENGINE);
+ } else if (Constant.SERVER_MODE_STREAM.equalsIgnoreCase(kylinConfig.getServerMode())) {
+ instanceTags.add(HelixClusterAdmin.TAG_STREAM_BUILDER);
}
-
+
addInstance(instanceName, instanceTags);
startInstance(instanceName);
@@ -108,6 +114,16 @@ public class HelixClusterAdmin {
}
}
+
+ public void addStreamCubeSlice(String cubeName, long start, long end) {
+ String resourceName = RESOURCE_STREAME_CUBE_PREFIX + cubeName + "_" + start + "_" + end;
+ if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) {
+ admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
+ }
+
+ admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER);
+
+ }
/**
* Start the instance and register the state model factory
http://git-wip-us.apache.org/repos/asf/kylin/blob/9274aa81/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
index 6694c81..c2a78e7 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -1,5 +1,6 @@
package org.apache.kylin.rest.helix;
+import com.google.common.base.Preconditions;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.TransitionHandler;
@@ -8,12 +9,16 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
+import org.apache.kylin.engine.streaming.cli.StreamingCLI;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.lock.MockJobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX;
+
/**
*/
public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> {
@@ -22,13 +27,19 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
@Override
public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) {
- return new JobEngineStateModel();
+ return JobEngineStateModel.INSTANCE;
}
-
+
+ if (partitionId.getResourceId().stringify().startsWith(RESOURCE_STREAME_CUBE_PREFIX)) {
+ return StreamCubeStateModel.INSTANCE;
+ }
+
return null;
}
public static class JobEngineStateModel extends TransitionHandler {
+
+ public static JobEngineStateModel INSTANCE = new JobEngineStateModel();
@Transition(to = "LEADER", from = "STANDBY")
public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
@@ -67,4 +78,39 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
}
}
+
+ public static class StreamCubeStateModel extends TransitionHandler {
+
+ public static StreamCubeStateModel INSTANCE = new StreamCubeStateModel();
+
+ @Transition(to = "LEADER", from = "STANDBY")
+ public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+ String resourceName = message.getResourceId().stringify();
+ Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
+ long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_")) + 1);
+ String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
+ long start = Long.parseLong(temp.substring(temp.lastIndexOf("_")) + 1);
+ String cubeName = temp.substring(0, temp.lastIndexOf("_"));
+
+ final Runnable runnable = new OneOffStreamingBuilder(cubeName, start, end).build();
+ runnable.run();
+ }
+
+ @Transition(to = "STANDBY", from = "LEADER")
+ public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+
+
+ }
+
+ @Transition(to = "STANDBY", from = "OFFLINE")
+ public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+
+ }
+
+
+ @Transition(to = "OFFLINE", from = "STANDBY")
+ public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+
+ }
+ }
}