You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/11/09 09:37:50 UTC

incubator-kylin git commit: helix-based job engine

Repository: incubator-kylin
Updated Branches:
  refs/heads/helix [created] 984306ae6


helix-based job engine


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/984306ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/984306ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/984306ae

Branch: refs/heads/helix
Commit: 984306ae6149f305d98bd2dc799a6df2bd9df156
Parents: a3397d0
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Nov 9 15:02:19 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Nov 9 15:03:54 2015 +0800

----------------------------------------------------------------------
 pom.xml                                         |  12 ++
 server/pom.xml                                  |  23 +++-
 .../kylin/rest/controller/JobController.java    |   7 ++
 .../kylin/rest/helix/HelixJobEngineAdmin.java   | 102 ++++++++++++++++
 .../rest/helix/JobControllerConnector.java      |  80 +++++++++++++
 .../rest/helix/JobControllerConstants.java      |  34 ++++++
 .../helix/v1/AbstractJobEngineStateModelV1.java |  46 +++++++
 .../rest/helix/v1/DefaultStateModelFactory.java |  38 ++++++
 .../helix/v1/EmptyJobEngineStateModelV1.java    |  62 ++++++++++
 .../kylin/rest/helix/v1/JobEngineSMDV1.java     |  69 +++++++++++
 .../rest/helix/JobControllerConnectorTest.java  | 120 +++++++++++++++++++
 .../helix/v1/TestJobEngineStateModelV1.java     |  83 +++++++++++++
 .../rest/helix/v1/TestStateModelFactory.java    |  38 ++++++
 13 files changed, 713 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3c696f7..5be477a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,7 @@
             org/apache/kylin/**/tools/**:**/*CLI.java
         </sonar.jacoco.excludes>
 
+        <helix.version>0.6.5</helix.version>
     </properties>
 
     <licenses>
@@ -460,6 +461,17 @@
                 <artifactId>curator-recipes</artifactId>
                 <version>${curator.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.helix</groupId>
+                <artifactId>helix-core</artifactId>
+                <version>${helix.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.helix</groupId>
+                <artifactId>helix-examples</artifactId>
+                <version>${helix.version}</version>
+            </dependency>
+
 
         </dependencies>
     </dependencyManagement>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index f2f9e32..8581e40 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -222,7 +222,22 @@
             <groupId>com.h2database</groupId>
             <artifactId>h2</artifactId>
         </dependency>
-		
+        <dependency>
+            <groupId>org.apache.helix</groupId>
+            <artifactId>helix-core</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.101tec</groupId>
+                    <artifactId>zkclient</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.github.sgroschupf</groupId>
+                    <artifactId>zkclient</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+
         <!-- spring aop -->
         <dependency>
             <groupId>org.aspectj</groupId>
@@ -428,6 +443,12 @@
             <version>${jetty.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.101tec</groupId>
+            <artifactId>zkclient</artifactId>
+            <version>0.5</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index f6323ed..6a76987 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -61,6 +61,13 @@ public class JobController extends BasicController implements InitializingBean {
 
     @Autowired
     private JobLock jobLock;
+    
+    public interface JobControllerListener {
+        
+        void onStartScheduler();
+        
+        void onStopScheduler();
+    }
 
     /*
      * (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java
new file mode 100644
index 0000000..803eed4
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.kylin.rest.helix.v1.JobEngineSMDV1;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import static org.apache.kylin.rest.helix.JobControllerConstants.CLUSTER_NAME;
+import static org.apache.kylin.rest.helix.JobControllerConstants.RESOURCE_NAME;
+
+/**
+ */
+public class HelixJobEngineAdmin {
+
+    private static final Logger logger = LoggerFactory.getLogger(HelixJobEngineAdmin.class);
+    private final String zkAddress;
+    private final ZKHelixAdmin admin;
+
+    public HelixJobEngineAdmin(String zkAddress) {
+        this.zkAddress = zkAddress;
+        this.admin = new ZKHelixAdmin(zkAddress);
+    }
+
+    public void initV1() {
+        final StateModelDefinition jobEngineSMDV1 = JobEngineSMDV1.getJobEngineStateModelDefinitionV1();
+        admin.addCluster(CLUSTER_NAME, false);
+        if (admin.getStateModelDef(CLUSTER_NAME, jobEngineSMDV1.getId()) == null) {
+            admin.addStateModelDef(CLUSTER_NAME, jobEngineSMDV1.getId(), jobEngineSMDV1);
+        }
+        if (!admin.getResourcesInCluster(CLUSTER_NAME).contains(RESOURCE_NAME)) {
+            admin.addResource(CLUSTER_NAME, RESOURCE_NAME, 1, jobEngineSMDV1.getId(), "AUTO");
+        }
+    }
+
+    public void startControllers() {
+        HelixControllerMain.startHelixController(zkAddress, CLUSTER_NAME, "localhost_" + JobControllerConstants.CONTROLLER_PORT,
+                HelixControllerMain.STANDALONE);
+    }
+
+    public String collectStateInfo(String msg, String clusterName, String resourceName) {
+        StringBuilder sb = new StringBuilder("");
+        sb.append("CLUSTER STATE: ").append(msg).append("\n");
+        ExternalView resourceExternalView = admin.getResourceExternalView(clusterName, resourceName);
+        if (resourceExternalView == null) {
+            sb.append("no participant joined yet").append("\n");
+            return sb.toString();
+        }
+        final List<String> instancesInCluster = admin.getInstancesInCluster(clusterName);
+        TreeSet<String> sortedSet = new TreeSet<String>(resourceExternalView.getPartitionSet());
+        sb.append("\t\t");
+        for (String instance : instancesInCluster) {
+            sb.append(instance).append("\t");
+        }
+        sb.append("\n");
+        for (String partitionName : sortedSet) {
+            sb.append(partitionName).append("\t");
+            for (String instance : instancesInCluster) {
+                Map<String, String> stateMap = resourceExternalView.getStateMap(partitionName);
+                if (stateMap != null && stateMap.containsKey(instance)) {
+                    sb.append(stateMap.get(instance)).append(
+                            "\t\t");
+                } else {
+                    sb.append("-").append("\t\t");
+                }
+            }
+            sb.append("\n");
+        }
+        sb.append("###################################################################").append("\n");
+        return sb.toString();
+    }
+    
+    public void rebalance(String clusterName, String resourceName) {
+        final List<String> instancesInCluster = admin.getInstancesInCluster(clusterName);
+        admin.rebalance(clusterName, resourceName, instancesInCluster.size(), instancesInCluster);
+        logger.info("cluster:" + CLUSTER_NAME + " ideal state:" + admin.getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME));
+        logger.info("cluster:" + CLUSTER_NAME + " instances:" + instancesInCluster);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java
new file mode 100644
index 0000000..e5c18eb
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.rest.helix.v1.DefaultStateModelFactory;
+import org.apache.kylin.rest.helix.v1.JobEngineSMDV1;
+
+import static org.apache.kylin.rest.helix.JobControllerConstants.CLUSTER_NAME;
+
+/**
+ */
+public class JobControllerConnector {
+
+    private final String instanceName;
+    private final HelixAdmin admin;
+    private final StateModelFactory<StateModel> stateModelFactory;
+    private final String zkAddress;
+    private final String hostname;
+    private final String port;
+    
+    private HelixManager manager;
+
+    public JobControllerConnector(String hostname, String port, String zkAddress, StateModelFactory<StateModel> stateModelFactory) {
+        this.instanceName = hostname + "_" + port;
+        this.hostname = hostname;
+        this.port = port;
+        this.zkAddress = zkAddress;
+        this.admin = new ZKHelixAdmin(zkAddress);
+        this.stateModelFactory = stateModelFactory;
+    }
+    
+    public void register() {
+        if (!admin.getInstancesInCluster(CLUSTER_NAME).contains(instanceName)) {
+            InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+            instanceConfig.setHostName(hostname);
+            instanceConfig.setPort(port);
+            admin.addInstance(CLUSTER_NAME, instanceConfig);
+        }
+    }
+
+
+    public void start() throws Exception {
+        this.manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceName,
+                InstanceType.PARTICIPANT, zkAddress);
+        StateMachineEngine stateMach = manager.getStateMachineEngine();
+        stateMach.registerStateModelFactory(JobEngineSMDV1.STATE_MODEL_NAME, stateModelFactory);
+        manager.connect();
+    }
+
+    public void stop() {
+        if (manager != null) {
+            manager.disconnect();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java
new file mode 100644
index 0000000..afc4177
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+/**
+ */
+public final class JobControllerConstants {
+
+    private JobControllerConstants() {
+    }
+
+    public static final String CLUSTER_NAME = "helix_job_engine";
+    public static final String RESOURCE_NAME = "job_engine_v1";
+    
+    
+    public static final int PARTICIPANT_PORT = 17077;
+    public static final int CONTROLLER_PORT = 17078;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java
new file mode 100644
index 0000000..194e446
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.kylin.rest.controller.JobController;
+
+/**
+ */
+abstract class AbstractJobEngineStateModelV1 extends StateModel implements JobController.JobControllerListener {
+
+    private final String instanceName;
+
+    public AbstractJobEngineStateModelV1(String instanceName) {
+        this.instanceName = instanceName;
+    }
+
+    public final String getInstanceName() {
+        return instanceName;
+    }
+
+    public abstract void onBecomeLeaderFromStandby(Message message, NotificationContext context);
+
+    public abstract void onBecomeStandbyFromLeader(Message message, NotificationContext context);
+
+    public abstract void onBecomeOfflineFromStandby(Message message, NotificationContext context);
+
+    public abstract void onBecomeStandbyFromOffline(Message message, NotificationContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java
new file mode 100644
index 0000000..ab6b395
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+/**
+ */
+public class DefaultStateModelFactory extends StateModelFactory<StateModel> {
+    
+    private final String instanceName;
+
+    public DefaultStateModelFactory(String instanceName) {
+        this.instanceName = instanceName;
+    }
+
+    @Override
+    public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
+        EmptyJobEngineStateModelV1 stateModel = new EmptyJobEngineStateModelV1(instanceName);
+        return stateModel;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java
new file mode 100644
index 0000000..469a634
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+
+/**
+ */
+public final class EmptyJobEngineStateModelV1 extends AbstractJobEngineStateModelV1 {
+
+    public EmptyJobEngineStateModelV1(String instanceName) {
+        super(instanceName);
+    }
+
+    @Override
+    public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+        System.out.println(getInstanceName() + " onBecomeLeaderFromStandby");
+        onStartScheduler();
+    }
+
+    @Override
+    public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+        System.out.println(getInstanceName() + " onBecomeStandbyFromLeader");
+        onStopScheduler();
+    }
+
+    @Override
+    public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+        System.out.println(getInstanceName() + " onBecomeOfflineFromStandby");
+    }
+
+    @Override
+    public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+        System.out.println(getInstanceName() + " onBecomeStandbyFromOffline");
+    }
+
+    @Override
+    public void onStartScheduler() {
+        System.out.println(getInstanceName() + " onStartScheduler");
+    }
+
+    @Override
+    public void onStopScheduler() {
+        System.out.println(getInstanceName() + " onStopScheduler");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java
new file mode 100644
index 0000000..4ccf9ae
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ */
+public final class JobEngineSMDV1 {
+
+    public static final String STATE_MODEL_NAME = "job_engine_model_v1";
+
+    public enum States {
+        LEADER,
+        STANDBY,
+        OFFLINE
+    }
+    
+    private static class StateModelDefinitionV1Holder {
+        private static StateModelDefinition instance = build();
+        private static StateModelDefinition build() {
+            StateModelDefinition.Builder builder = new StateModelDefinition.Builder(STATE_MODEL_NAME);
+            // init state
+            builder.initialState(States.OFFLINE.name());
+
+            // add states
+            builder.addState(States.LEADER.name(), 0);
+            builder.addState(States.STANDBY.name(), 2);
+            builder.addState(States.OFFLINE.name(), 1);
+            for (HelixDefinedState state : HelixDefinedState.values()) {
+                builder.addState(state.name());
+            }
+
+            // add transitions
+            builder.addTransition(States.LEADER.name(), States.STANDBY.name(), 0);
+            builder.addTransition(States.STANDBY.name(), States.LEADER.name(), 1);
+            builder.addTransition(States.OFFLINE.name(), States.STANDBY.name(), 2);
+            builder.addTransition(States.STANDBY.name(), States.OFFLINE.name(), 3);
+            builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());
+
+            // bounds
+            builder.upperBound(States.LEADER.name(), 1);
+            builder.dynamicUpperBound(States.STANDBY.name(), "R");
+
+            return builder.build();
+        }
+    }
+    
+    public static StateModelDefinition getJobEngineStateModelDefinitionV1() {
+        return StateModelDefinitionV1Holder.instance;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java b/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java
new file mode 100644
index 0000000..b6ddb95
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import com.google.common.collect.Lists;
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+
+import static org.apache.kylin.rest.helix.JobControllerConstants.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.rest.helix.v1.TestJobEngineStateModelV1;
+import org.apache.kylin.rest.helix.v1.TestStateModelFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ */
+public class JobControllerConnectorTest {
+
+
+    String zkAddress = "localhost:2199";
+    ZkServer server;
+
+    List<JobControllerConnector> connectors ;
+    HelixJobEngineAdmin helixJobEngineAdmin;
+
+
+    @Before
+    public void setup() {
+        // start zookeeper on localhost
+        final File tmpDir = new File("/tmp/helix-quickstart");
+        FileUtil.fullyDelete(tmpDir);
+        tmpDir.mkdirs();
+        server = new ZkServer("/tmp/helix-quickstart/dataDir", "/tmp/helix-quickstart/logDir",
+                new IDefaultNameSpace() {
+                    @Override
+                    public void createDefaultNameSpace(ZkClient zkClient) {
+                    }
+                }, 2199);
+        server.start();
+        
+        final ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkAddress);
+        zkHelixAdmin.dropCluster(CLUSTER_NAME);
+        connectors = Lists.newArrayList();
+        helixJobEngineAdmin = new HelixJobEngineAdmin(zkAddress);
+        helixJobEngineAdmin.initV1();
+        helixJobEngineAdmin.startControllers();
+
+    }
+
+    @Test
+    public void test() throws Exception {
+        JobControllerConnector connector = new JobControllerConnector("localhost", "10000", zkAddress, new TestStateModelFactory("localhost", "10000"));
+        connector.register();
+        helixJobEngineAdmin.rebalance(CLUSTER_NAME, RESOURCE_NAME);
+        connector.start();
+        connectors.add(connector);
+        Thread.sleep(1000);
+        System.out.println(helixJobEngineAdmin.collectStateInfo("add 1 nodes", CLUSTER_NAME, RESOURCE_NAME));
+        assertEquals(1, TestJobEngineStateModelV1.getLeaderCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getStandbyCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+        connector = new JobControllerConnector("localhost", "10001", zkAddress, new TestStateModelFactory("localhost", "10001"));
+        connector.register();
+        helixJobEngineAdmin.rebalance(CLUSTER_NAME, RESOURCE_NAME);
+        connector.start();
+        connectors.add(connector);
+        Thread.sleep(1000);
+        System.out.println(helixJobEngineAdmin.collectStateInfo("add 2 nodes", CLUSTER_NAME, RESOURCE_NAME));
+        assertEquals(1, TestJobEngineStateModelV1.getLeaderCount().get());
+        assertEquals(1, TestJobEngineStateModelV1.getStandbyCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+        
+        connectors.remove(0).stop();
+        TestJobEngineStateModelV1.getLeaderCount().decrementAndGet();
+        Thread.sleep(1000);
+        assertEquals(1, TestJobEngineStateModelV1.getLeaderCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getStandbyCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+        connectors.remove(0).stop();
+        TestJobEngineStateModelV1.getLeaderCount().decrementAndGet();
+        Thread.sleep(1000);
+        assertEquals(0, TestJobEngineStateModelV1.getLeaderCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getStandbyCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+        
+        Thread.sleep(2000);
+    }
+
+    @After
+    public void tearDown() {
+        server.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java
new file mode 100644
index 0000000..0a5f108
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ */
+public class TestJobEngineStateModelV1 extends AbstractJobEngineStateModelV1 {
+    
+    private static AtomicInteger offlineCount = new AtomicInteger(0);
+    private static AtomicInteger standbyCount = new AtomicInteger(0);
+    private static AtomicInteger leaderCount = new AtomicInteger(0);
+    
+    public TestJobEngineStateModelV1(String instanceName) {
+        super(instanceName);
+        offlineCount.incrementAndGet();
+    }
+
+    @Override
+    public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+        leaderCount.incrementAndGet();
+        standbyCount.decrementAndGet();
+    }
+
+    @Override
+    public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+        standbyCount.incrementAndGet();
+        leaderCount.decrementAndGet();
+    }
+
+    @Override
+    public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+        offlineCount.incrementAndGet();
+        standbyCount.decrementAndGet();
+    }
+
+    @Override
+    public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+        standbyCount.incrementAndGet();
+        offlineCount.decrementAndGet();
+    }
+    
+    public static AtomicInteger getStandbyCount() {
+        return standbyCount;
+    }
+
+    public static AtomicInteger getOfflineCount() {
+        return offlineCount;
+    }
+
+    public static AtomicInteger getLeaderCount() {
+        return leaderCount;
+    }
+
+    @Override
+    public void onStartScheduler() {
+
+    }
+
+    @Override
+    public void onStopScheduler() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/984306ae/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java
new file mode 100644
index 0000000..ce0820b
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+/**
+ */
+public class TestStateModelFactory extends StateModelFactory<StateModel> {
+
+    private final String instanceName;
+
+    public TestStateModelFactory(String hostname, String port) {
+        this.instanceName = hostname + "_" + port;
+    }
+
+    @Override
+    public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
+        TestJobEngineStateModelV1 stateModel = new TestJobEngineStateModelV1(instanceName);
+        return stateModel;
+    }
+}