You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/11/09 09:37:50 UTC
incubator-kylin git commit: helix-based job engine
Repository: incubator-kylin
Updated Branches:
refs/heads/helix [created] 984306ae6
helix-based job engine
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/984306ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/984306ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/984306ae
Branch: refs/heads/helix
Commit: 984306ae6149f305d98bd2dc799a6df2bd9df156
Parents: a3397d0
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Nov 9 15:02:19 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Nov 9 15:03:54 2015 +0800
----------------------------------------------------------------------
pom.xml | 12 ++
server/pom.xml | 23 +++-
.../kylin/rest/controller/JobController.java | 7 ++
.../kylin/rest/helix/HelixJobEngineAdmin.java | 102 ++++++++++++++++
.../rest/helix/JobControllerConnector.java | 80 +++++++++++++
.../rest/helix/JobControllerConstants.java | 34 ++++++
.../helix/v1/AbstractJobEngineStateModelV1.java | 46 +++++++
.../rest/helix/v1/DefaultStateModelFactory.java | 38 ++++++
.../helix/v1/EmptyJobEngineStateModelV1.java | 62 ++++++++++
.../kylin/rest/helix/v1/JobEngineSMDV1.java | 69 +++++++++++
.../rest/helix/JobControllerConnectorTest.java | 120 +++++++++++++++++++
.../helix/v1/TestJobEngineStateModelV1.java | 83 +++++++++++++
.../rest/helix/v1/TestStateModelFactory.java | 38 ++++++
13 files changed, 713 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3c696f7..5be477a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,7 @@
org/apache/kylin/**/tools/**:**/*CLI.java
</sonar.jacoco.excludes>
+ <helix.version>0.6.5</helix.version>
</properties>
<licenses>
@@ -460,6 +461,17 @@
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>${helix.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-examples</artifactId>
+ <version>${helix.version}</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index f2f9e32..8581e40 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -222,7 +222,22 @@
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.101tec</groupId>
+ <artifactId>zkclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.github.sgroschupf</groupId>
+ <artifactId>zkclient</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+
<!-- spring aop -->
<dependency>
<groupId>org.aspectj</groupId>
@@ -428,6 +443,12 @@
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.101tec</groupId>
+ <artifactId>zkclient</artifactId>
+ <version>0.5</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index f6323ed..6a76987 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -61,6 +61,13 @@ public class JobController extends BasicController implements InitializingBean {
@Autowired
private JobLock jobLock;
+
+ public interface JobControllerListener {
+
+ void onStartScheduler();
+
+ void onStopScheduler();
+ }
/*
* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java
new file mode 100644
index 0000000..803eed4
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.kylin.rest.helix.v1.JobEngineSMDV1;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import static org.apache.kylin.rest.helix.JobControllerConstants.CLUSTER_NAME;
+import static org.apache.kylin.rest.helix.JobControllerConstants.RESOURCE_NAME;
+
+/**
+ */
+public class HelixJobEngineAdmin {
+
+ private static final Logger logger = LoggerFactory.getLogger(HelixJobEngineAdmin.class);
+ private final String zkAddress;
+ private final ZKHelixAdmin admin;
+
+ public HelixJobEngineAdmin(String zkAddress) {
+ this.zkAddress = zkAddress;
+ this.admin = new ZKHelixAdmin(zkAddress);
+ }
+
+ public void initV1() {
+ final StateModelDefinition jobEngineSMDV1 = JobEngineSMDV1.getJobEngineStateModelDefinitionV1();
+ admin.addCluster(CLUSTER_NAME, false);
+ if (admin.getStateModelDef(CLUSTER_NAME, jobEngineSMDV1.getId()) == null) {
+ admin.addStateModelDef(CLUSTER_NAME, jobEngineSMDV1.getId(), jobEngineSMDV1);
+ }
+ if (!admin.getResourcesInCluster(CLUSTER_NAME).contains(RESOURCE_NAME)) {
+ admin.addResource(CLUSTER_NAME, RESOURCE_NAME, 1, jobEngineSMDV1.getId(), "AUTO");
+ }
+ }
+
+ public void startControllers() {
+ HelixControllerMain.startHelixController(zkAddress, CLUSTER_NAME, "localhost_" + JobControllerConstants.CONTROLLER_PORT,
+ HelixControllerMain.STANDALONE);
+ }
+
+ public String collectStateInfo(String msg, String clusterName, String resourceName) {
+ StringBuilder sb = new StringBuilder("");
+ sb.append("CLUSTER STATE: ").append(msg).append("\n");
+ ExternalView resourceExternalView = admin.getResourceExternalView(clusterName, resourceName);
+ if (resourceExternalView == null) {
+ sb.append("no participant joined yet").append("\n");
+ return sb.toString();
+ }
+ final List<String> instancesInCluster = admin.getInstancesInCluster(clusterName);
+ TreeSet<String> sortedSet = new TreeSet<String>(resourceExternalView.getPartitionSet());
+ sb.append("\t\t");
+ for (String instance : instancesInCluster) {
+ sb.append(instance).append("\t");
+ }
+ sb.append("\n");
+ for (String partitionName : sortedSet) {
+ sb.append(partitionName).append("\t");
+ for (String instance : instancesInCluster) {
+ Map<String, String> stateMap = resourceExternalView.getStateMap(partitionName);
+ if (stateMap != null && stateMap.containsKey(instance)) {
+ sb.append(stateMap.get(instance)).append(
+ "\t\t");
+ } else {
+ sb.append("-").append("\t\t");
+ }
+ }
+ sb.append("\n");
+ }
+ sb.append("###################################################################").append("\n");
+ return sb.toString();
+ }
+
+ public void rebalance(String clusterName, String resourceName) {
+ final List<String> instancesInCluster = admin.getInstancesInCluster(clusterName);
+ admin.rebalance(clusterName, resourceName, instancesInCluster.size(), instancesInCluster);
+ logger.info("cluster:" + CLUSTER_NAME + " ideal state:" + admin.getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME));
+ logger.info("cluster:" + CLUSTER_NAME + " instances:" + instancesInCluster);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java
new file mode 100644
index 0000000..e5c18eb
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.rest.helix.v1.DefaultStateModelFactory;
+import org.apache.kylin.rest.helix.v1.JobEngineSMDV1;
+
+import static org.apache.kylin.rest.helix.JobControllerConstants.CLUSTER_NAME;
+
+/**
+ */
+public class JobControllerConnector {
+
+ private final String instanceName;
+ private final HelixAdmin admin;
+ private final StateModelFactory<StateModel> stateModelFactory;
+ private final String zkAddress;
+ private final String hostname;
+ private final String port;
+
+ private HelixManager manager;
+
+ public JobControllerConnector(String hostname, String port, String zkAddress, StateModelFactory<StateModel> stateModelFactory) {
+ this.instanceName = hostname + "_" + port;
+ this.hostname = hostname;
+ this.port = port;
+ this.zkAddress = zkAddress;
+ this.admin = new ZKHelixAdmin(zkAddress);
+ this.stateModelFactory = stateModelFactory;
+ }
+
+ public void register() {
+ if (!admin.getInstancesInCluster(CLUSTER_NAME).contains(instanceName)) {
+ InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+ instanceConfig.setHostName(hostname);
+ instanceConfig.setPort(port);
+ admin.addInstance(CLUSTER_NAME, instanceConfig);
+ }
+ }
+
+
+ public void start() throws Exception {
+ this.manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceName,
+ InstanceType.PARTICIPANT, zkAddress);
+ StateMachineEngine stateMach = manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory(JobEngineSMDV1.STATE_MODEL_NAME, stateModelFactory);
+ manager.connect();
+ }
+
+ public void stop() {
+ if (manager != null) {
+ manager.disconnect();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java
new file mode 100644
index 0000000..afc4177
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+/**
+ */
+public final class JobControllerConstants {
+
+ private JobControllerConstants() {
+ }
+
+ public static final String CLUSTER_NAME = "helix_job_engine";
+ public static final String RESOURCE_NAME = "job_engine_v1";
+
+
+ public static final int PARTICIPANT_PORT = 17077;
+ public static final int CONTROLLER_PORT = 17078;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java
new file mode 100644
index 0000000..194e446
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.kylin.rest.controller.JobController;
+
+/**
+ */
+abstract class AbstractJobEngineStateModelV1 extends StateModel implements JobController.JobControllerListener {
+
+ private final String instanceName;
+
+ public AbstractJobEngineStateModelV1(String instanceName) {
+ this.instanceName = instanceName;
+ }
+
+ public final String getInstanceName() {
+ return instanceName;
+ }
+
+ public abstract void onBecomeLeaderFromStandby(Message message, NotificationContext context);
+
+ public abstract void onBecomeStandbyFromLeader(Message message, NotificationContext context);
+
+ public abstract void onBecomeOfflineFromStandby(Message message, NotificationContext context);
+
+ public abstract void onBecomeStandbyFromOffline(Message message, NotificationContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java
new file mode 100644
index 0000000..ab6b395
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+/**
+ */
+public class DefaultStateModelFactory extends StateModelFactory<StateModel> {
+
+ private final String instanceName;
+
+ public DefaultStateModelFactory(String instanceName) {
+ this.instanceName = instanceName;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
+ EmptyJobEngineStateModelV1 stateModel = new EmptyJobEngineStateModelV1(instanceName);
+ return stateModel;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java
new file mode 100644
index 0000000..469a634
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+
+/**
+ */
+public final class EmptyJobEngineStateModelV1 extends AbstractJobEngineStateModelV1 {
+
+ public EmptyJobEngineStateModelV1(String instanceName) {
+ super(instanceName);
+ }
+
+ @Override
+ public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+ System.out.println(getInstanceName() + " onBecomeLeaderFromStandby");
+ onStartScheduler();
+ }
+
+ @Override
+ public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+ System.out.println(getInstanceName() + " onBecomeStandbyFromLeader");
+ onStopScheduler();
+ }
+
+ @Override
+ public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+ System.out.println(getInstanceName() + " onBecomeOfflineFromStandby");
+ }
+
+ @Override
+ public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+ System.out.println(getInstanceName() + " onBecomeStandbyFromOffline");
+ }
+
+ @Override
+ public void onStartScheduler() {
+ System.out.println(getInstanceName() + " onStartScheduler");
+ }
+
+ @Override
+ public void onStopScheduler() {
+ System.out.println(getInstanceName() + " onStopScheduler");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java
new file mode 100644
index 0000000..4ccf9ae
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ */
+public final class JobEngineSMDV1 {
+
+ public static final String STATE_MODEL_NAME = "job_engine_model_v1";
+
+ public enum States {
+ LEADER,
+ STANDBY,
+ OFFLINE
+ }
+
+ private static class StateModelDefinitionV1Holder {
+ private static StateModelDefinition instance = build();
+ private static StateModelDefinition build() {
+ StateModelDefinition.Builder builder = new StateModelDefinition.Builder(STATE_MODEL_NAME);
+ // init state
+ builder.initialState(States.OFFLINE.name());
+
+ // add states
+ builder.addState(States.LEADER.name(), 0);
+ builder.addState(States.STANDBY.name(), 2);
+ builder.addState(States.OFFLINE.name(), 1);
+ for (HelixDefinedState state : HelixDefinedState.values()) {
+ builder.addState(state.name());
+ }
+
+ // add transitions
+ builder.addTransition(States.LEADER.name(), States.STANDBY.name(), 0);
+ builder.addTransition(States.STANDBY.name(), States.LEADER.name(), 1);
+ builder.addTransition(States.OFFLINE.name(), States.STANDBY.name(), 2);
+ builder.addTransition(States.STANDBY.name(), States.OFFLINE.name(), 3);
+ builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());
+
+ // bounds
+ builder.upperBound(States.LEADER.name(), 1);
+ builder.dynamicUpperBound(States.STANDBY.name(), "R");
+
+ return builder.build();
+ }
+ }
+
+ public static StateModelDefinition getJobEngineStateModelDefinitionV1() {
+ return StateModelDefinitionV1Holder.instance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java b/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java
new file mode 100644
index 0000000..b6ddb95
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import com.google.common.collect.Lists;
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+
+import static org.apache.kylin.rest.helix.JobControllerConstants.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.rest.helix.v1.TestJobEngineStateModelV1;
+import org.apache.kylin.rest.helix.v1.TestStateModelFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ */
+public class JobControllerConnectorTest {
+
+
+ String zkAddress = "localhost:2199";
+ ZkServer server;
+
+ List<JobControllerConnector> connectors ;
+ HelixJobEngineAdmin helixJobEngineAdmin;
+
+
+ @Before
+ public void setup() {
+ // start zookeeper on localhost
+ final File tmpDir = new File("/tmp/helix-quickstart");
+ FileUtil.fullyDelete(tmpDir);
+ tmpDir.mkdirs();
+ server = new ZkServer("/tmp/helix-quickstart/dataDir", "/tmp/helix-quickstart/logDir",
+ new IDefaultNameSpace() {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+ }
+ }, 2199);
+ server.start();
+
+ final ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkAddress);
+ zkHelixAdmin.dropCluster(CLUSTER_NAME);
+ connectors = Lists.newArrayList();
+ helixJobEngineAdmin = new HelixJobEngineAdmin(zkAddress);
+ helixJobEngineAdmin.initV1();
+ helixJobEngineAdmin.startControllers();
+
+ }
+
+ @Test
+ public void test() throws Exception {
+ JobControllerConnector connector = new JobControllerConnector("localhost", "10000", zkAddress, new TestStateModelFactory("localhost", "10000"));
+ connector.register();
+ helixJobEngineAdmin.rebalance(CLUSTER_NAME, RESOURCE_NAME);
+ connector.start();
+ connectors.add(connector);
+ Thread.sleep(1000);
+ System.out.println(helixJobEngineAdmin.collectStateInfo("add 1 nodes", CLUSTER_NAME, RESOURCE_NAME));
+ assertEquals(1, TestJobEngineStateModelV1.getLeaderCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getStandbyCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+ connector = new JobControllerConnector("localhost", "10001", zkAddress, new TestStateModelFactory("localhost", "10001"));
+ connector.register();
+ helixJobEngineAdmin.rebalance(CLUSTER_NAME, RESOURCE_NAME);
+ connector.start();
+ connectors.add(connector);
+ Thread.sleep(1000);
+ System.out.println(helixJobEngineAdmin.collectStateInfo("add 2 nodes", CLUSTER_NAME, RESOURCE_NAME));
+ assertEquals(1, TestJobEngineStateModelV1.getLeaderCount().get());
+ assertEquals(1, TestJobEngineStateModelV1.getStandbyCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+
+ connectors.remove(0).stop();
+ TestJobEngineStateModelV1.getLeaderCount().decrementAndGet();
+ Thread.sleep(1000);
+ assertEquals(1, TestJobEngineStateModelV1.getLeaderCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getStandbyCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+ connectors.remove(0).stop();
+ TestJobEngineStateModelV1.getLeaderCount().decrementAndGet();
+ Thread.sleep(1000);
+ assertEquals(0, TestJobEngineStateModelV1.getLeaderCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getStandbyCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+ Thread.sleep(2000);
+ }
+
+ @After
+ public void tearDown() {
+ server.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java
new file mode 100644
index 0000000..0a5f108
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ */
+public class TestJobEngineStateModelV1 extends AbstractJobEngineStateModelV1 {
+
+ private static AtomicInteger offlineCount = new AtomicInteger(0);
+ private static AtomicInteger standbyCount = new AtomicInteger(0);
+ private static AtomicInteger leaderCount = new AtomicInteger(0);
+
+ public TestJobEngineStateModelV1(String instanceName) {
+ super(instanceName);
+ offlineCount.incrementAndGet();
+ }
+
+ @Override
+ public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+ leaderCount.incrementAndGet();
+ standbyCount.decrementAndGet();
+ }
+
+ @Override
+ public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+ standbyCount.incrementAndGet();
+ leaderCount.decrementAndGet();
+ }
+
+ @Override
+ public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+ offlineCount.incrementAndGet();
+ standbyCount.decrementAndGet();
+ }
+
+ @Override
+ public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+ standbyCount.incrementAndGet();
+ offlineCount.decrementAndGet();
+ }
+
+ public static AtomicInteger getStandbyCount() {
+ return standbyCount;
+ }
+
+ public static AtomicInteger getOfflineCount() {
+ return offlineCount;
+ }
+
+ public static AtomicInteger getLeaderCount() {
+ return leaderCount;
+ }
+
+ @Override
+ public void onStartScheduler() {
+
+ }
+
+ @Override
+ public void onStopScheduler() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java
new file mode 100644
index 0000000..ce0820b
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+/**
+ */
+public class TestStateModelFactory extends StateModelFactory<StateModel> {
+
+ private final String instanceName;
+
+ public TestStateModelFactory(String hostname, String port) {
+ this.instanceName = hostname + "_" + port;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
+ TestJobEngineStateModelV1 stateModel = new TestJobEngineStateModelV1(instanceName);
+ return stateModel;
+ }
+}