You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/14 03:53:21 UTC

[05/15] kylin git commit: KYLIN-1188 use helix 0.7.1 to manage the job engine assignment

KYLIN-1188 use helix 0.7.1 to manage the job engine assignment


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ceec8980
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ceec8980
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ceec8980

Branch: refs/heads/helix-201602
Commit: ceec8980b5a02a5f2e2f5f8f8f34ded4b708a638
Parents: b26d957
Author: shaofengshi <sh...@apache.org>
Authored: Tue Jan 12 15:07:25 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Feb 6 13:31:49 2016 +0800

----------------------------------------------------------------------
 build/conf/kylin.properties                     |  16 +-
 .../apache/kylin/common/KylinConfigBase.java    |  28 +++
 .../test_case_data/sandbox/kylin.properties     |   2 +
 pom.xml                                         |   1 +
 .../kylin/rest/controller/JobController.java    |  33 +--
 .../kylin/rest/helix/HelixClusterAdmin.java     | 245 +++++++++++++++++++
 .../helix/LeaderStandbyStateModelFactory.java   |  70 ++++++
 .../apache/kylin/rest/service/CubeService.java  |   6 +-
 .../kylin/rest/helix/HelixClusterAdminTest.java | 140 +++++++++++
 9 files changed, 516 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 44a282e..8456ecb 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -1,12 +1,24 @@
-## Config for Kylin Engine ##
+## Cluster related properties ##
+# Required, comma separated list of zk servers; 
+kylin.zookeeper.address=
 
+# rest address of this instance, ;
+# optional, default be <hostname>:7070
+kylin.rest.address=
+
+# whether run a cluster controller in this node
+kylin.cluster.controller=true
 
 # optional information for the owner of kylin platform, it can be your team's email
 # currently it will be attached to each kylin's htable attribute
 kylin.owner=whoami@kylin.apache.org
 
 # List of web servers in use, this enables one web server instance to sync up with other servers.
-kylin.rest.servers=localhost:7070
+# Deprecated, cluster will self-discover and update this.
+# kylin.rest.servers=localhost:7070
+
+# Server mode: all, job, query
+kylin.server.mode=all
 
 # The metadata store in hbase
 kylin.metadata.url=kylin_metadata@hbase

http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 5ce4ddc..a36b977 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -530,6 +530,34 @@ public class KylinConfigBase implements Serializable {
         return getOptional("mail.sender", "");
     }
 
+    public String getZookeeperAddress() {
+        return this.getOptional("kylin.zookeeper.address");
+    }
+
+    public void setZookeeperAddress(String zkAddress) {
+        setProperty("kylin.zookeeper.address", zkAddress);
+    }
+    
+    public String getClusterName() {
+        return this.getOptional("kylin.cluster.name", getMetadataUrlPrefix());
+    }
+
+    public void setClusterName(String clusterName) {
+        setProperty("kylin.cluster.name", clusterName);
+    }
+    
+    public boolean isClusterController() {
+        return Boolean.parseBoolean(getOptional("kylin.cluster.controller", "true"));
+    }
+    
+    public String getRestAddress() {
+        return this.getOptional("kylin.rest.address");
+    }
+
+    public void setRestAddress(String restAddress) {
+        setProperty("kylin.rest.address", restAddress);
+    }
+    
     public String toString() {
         return getMetadataUrl();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 18ff1cc..5ce636b 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -11,6 +11,8 @@ kylin.rest.servers=localhost:7070
 #set display timezone on UI,format like[GMT+N or GMT-N]
 kylin.rest.timezone=GMT-8
 
+kylin.server.mode=all
+>>>>>>> KYLIN-1188 use helix 0.7.1 to manage the job engine assignment
 # The metadata store in hbase
 kylin.metadata.url=kylin_metadata@hbase
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 21fd8aa..8f04dcd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,7 @@
             org/apache/kylin/**/tools/**:**/*CLI.java
         </sonar.jacoco.excludes>
 
+        <helix.version>0.7.1</helix.version>
     </properties>
 
     <licenses>

http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 9dfb594..741b5ee 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -26,15 +26,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
 
-import com.google.common.collect.Lists;
-import joptsimple.internal.Strings;
+import com.google.common.base.Preconditions;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.rest.exception.InternalErrorException;
-import org.apache.kylin.rest.helix.HelixJobEngineAdmin;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
 import org.apache.kylin.rest.request.JobListRequest;
 import org.apache.kylin.rest.service.JobService;
 import org.slf4j.Logger;
@@ -51,8 +49,6 @@ import java.io.IOException;
 import java.util.*;
 
 /**
- * @author ysong1
- * @author Jack
  * 
  */
 @Controller
@@ -76,9 +72,19 @@ public class JobController extends BasicController implements InitializingBean {
         TimeZone tzone = TimeZone.getTimeZone(timeZone);
         TimeZone.setDefault(tzone);
 
-        final String instanceName = HelixJobEngineAdmin.getCurrentInstanceName();
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
 
+        Preconditions.checkNotNull(kylinConfig.getZookeeperAddress(), "'kylin.zookeeper.address' couldn't be null, set it in kylin.properties.");
+        final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig);
+        clusterAdmin.start();
+        
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                clusterAdmin.stop();
+            }
+        }));
+
     }
 
     /**
@@ -190,17 +196,4 @@ public class JobController extends BasicController implements InitializingBean {
         this.jobService = jobService;
     }
 
-    private void updateKylinCluster(List<String> instances) {
-        List<String> instanceRestAddresses = Lists.newArrayList();
-        for (String instanceName : instances) {
-            int indexOfUnderscore = instanceName.lastIndexOf("_");
-            instanceRestAddresses.add(instanceName.substring(0, indexOfUnderscore) + ":" + instanceName.substring(indexOfUnderscore + 1));
-        }
-        String restServersInCluster = Strings.join(instanceRestAddresses, ",");
-        KylinConfig.getInstanceFromEnv().setProperty("kylin.rest.servers", restServersInCluster);
-        System.setProperty("kylin.rest.servers", restServersInCluster);
-        Broadcaster.clearCache();
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
new file mode 100644
index 0000000..9983aae
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import joptsimple.internal.Strings;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.*;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.*;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.rest.constant.Constant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Administrator of Kylin cluster
+ */
+public class HelixClusterAdmin {
+
+    public static final String RESOURCE_NAME_JOB_ENGINE = "Resource_JobEngine";
+
+    public static final String MODEL_LEADER_STANDBY = "LeaderStandby";
+    public static final String MODEL_ONLINE_OFFLINE = "OnlineOffline";
+    public static final String TAG_JOB_ENGINE = "Tag_JobEngine";
+
+    private static ConcurrentMap<KylinConfig, HelixClusterAdmin> instanceMaps = Maps.newConcurrentMap();
+    private HelixManager participantManager;
+    private HelixManager controllerManager;
+
+    private final KylinConfig kylinConfig;
+
+    private static final Logger logger = LoggerFactory.getLogger(HelixClusterAdmin.class);
+    private final String zkAddress;
+    private final ZKHelixAdmin admin;
+    private final String clusterName;
+
+    private HelixClusterAdmin(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+        this.zkAddress = kylinConfig.getZookeeperAddress();
+        this.clusterName = kylinConfig.getClusterName();
+        this.admin = new ZKHelixAdmin(zkAddress);
+    }
+
+    public void start() throws Exception {
+        initCluster();
+        final String instanceName = getCurrentInstanceName();
+
+        // use the tag to mark node's role.
+        final List<String> instanceTags = Lists.newArrayList();
+        final boolean runJobEngine = Constant.SERVER_MODE_ALL.equalsIgnoreCase(kylinConfig.getServerMode()) || Constant.SERVER_MODE_JOB.equalsIgnoreCase(kylinConfig.getServerMode());
+        if (runJobEngine) {
+            instanceTags.add(HelixClusterAdmin.TAG_JOB_ENGINE);
+        }
+
+        addInstance(instanceName, instanceTags);
+        startInstance(instanceName);
+
+        rebalanceWithTag(instanceTags);
+
+        boolean startController = kylinConfig.isClusterController();
+        if (startController) {
+            startController();
+        }
+    }
+
+    /**
+     * Initiate the cluster, adding state model definitions and resource definitions
+     */
+    protected void initCluster() {
+        admin.addCluster(clusterName, false);
+        if (admin.getStateModelDef(clusterName, MODEL_ONLINE_OFFLINE) == null) {
+            admin.addStateModelDef(clusterName, MODEL_ONLINE_OFFLINE, new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()));
+        }
+        if (admin.getStateModelDef(clusterName, MODEL_LEADER_STANDBY) == null) {
+            admin.addStateModelDef(clusterName, MODEL_LEADER_STANDBY, new StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby()));
+        }
+
+        // add job engine as a resource, 1 partition
+        if (!admin.getResourcesInCluster(clusterName).contains(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE)) {
+            admin.addResource(clusterName, HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
+        }
+
+    }
+
+    /**
+     * Start the instance and register the state model factory
+     * @param instanceName
+     * @throws Exception
+     */
+    protected void startInstance(String instanceName) throws Exception {
+        participantManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddress);
+        participantManager.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(MODEL_LEADER_STANDBY), new LeaderStandbyStateModelFactory());
+        participantManager.connect();
+        participantManager.addLiveInstanceChangeListener(new KylinClusterLiveInstanceChangeListener());
+
+    }
+
+    /**
+     * Rebalance the resource with the tags
+     * @param tags
+     */
+    protected void rebalanceWithTag(List<String> tags) {
+        for (String tag : tags) {
+            if (tag.equals(TAG_JOB_ENGINE)) {
+                List<String> instances = admin.getInstancesInClusterWithTag(clusterName, TAG_JOB_ENGINE);
+                admin.rebalance(clusterName, RESOURCE_NAME_JOB_ENGINE, instances.size(), "", tag);
+            }
+        }
+    }
+
+    /**
+     * Start an embedded helix controller
+     */
+    protected void startController() {
+        controllerManager = HelixControllerMain.startHelixController(zkAddress, clusterName, "controller", HelixControllerMain.STANDALONE);
+    }
+
+    public void stop() {
+        if (participantManager != null) {
+            participantManager.disconnect();
+        }
+
+        if (controllerManager != null) {
+            controllerManager.disconnect();
+        }
+    }
+
+    public String getInstanceState(String resourceName) {
+        String instanceName = this.getCurrentInstanceName();
+        final ExternalView resourceExternalView = admin.getResourceExternalView(clusterName, resourceName);
+        if (resourceExternalView == null) {
+            logger.warn("fail to get ExternalView, clusterName:" + clusterName + " resourceName:" + resourceName);
+            return "ERROR";
+        }
+        final Set<String> partitionSet = resourceExternalView.getPartitionSet();
+        final Map<String, String> stateMap = resourceExternalView.getStateMap(partitionSet.iterator().next());
+        if (stateMap.containsKey(instanceName)) {
+            return stateMap.get(instanceName);
+        } else {
+            logger.warn("fail to get state, clusterName:" + clusterName + " resourceName:" + resourceName + " instance:" + instanceName);
+            return "ERROR";
+        }
+    }
+
+    /**
+     * Check whether current kylin instance is in the leader role
+     * @return
+     */
+    public boolean isLeaderRole(String resourceName) {
+        final String instanceState = getInstanceState(resourceName);
+        logger.debug("instance state: " + instanceState);
+        if ("LEADER".equalsIgnoreCase(instanceState)) {
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Add instance to cluster, with a tag list
+     * @param instanceName should be unique in format: hostName_port
+     * @param tags
+     */
+    public void addInstance(String instanceName, List<String> tags) {
+        final String hostname = instanceName.substring(0, instanceName.lastIndexOf("_"));
+        final String port = instanceName.substring(instanceName.lastIndexOf("_") + 1);
+        InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+        instanceConfig.setHostName(hostname);
+        instanceConfig.setPort(port);
+        if (tags != null) {
+            for (String tag : tags) {
+                instanceConfig.addTag(tag);
+            }
+        }
+
+        if (admin.getInstancesInCluster(clusterName).contains(instanceName)) {
+            admin.dropInstance(clusterName, instanceConfig);
+        }
+        admin.addInstance(clusterName, instanceConfig);
+    }
+
+    public static HelixClusterAdmin getInstance(KylinConfig kylinConfig) {
+        Preconditions.checkNotNull(kylinConfig);
+        instanceMaps.putIfAbsent(kylinConfig, new HelixClusterAdmin(kylinConfig));
+        return instanceMaps.get(kylinConfig);
+    }
+
+    public String getCurrentInstanceName() {
+        final String restAddress = kylinConfig.getRestAddress();
+        if (StringUtils.isEmpty(restAddress)) {
+            throw new RuntimeException("There is no kylin.rest.address set in System property and kylin.properties;");
+        }
+
+        final String hostname = Preconditions.checkNotNull(restAddress.substring(0, restAddress.lastIndexOf(":")), "failed to get HostName of this server");
+        final String port = Preconditions.checkNotNull(restAddress.substring(restAddress.lastIndexOf(":") + 1), "failed to get port of this server");
+        return hostname + "_" + port;
+    }
+
+    /**
+     * Listen to the cluster's event, update "kylin.rest.servers" to the live instances.
+     */
+    class KylinClusterLiveInstanceChangeListener implements LiveInstanceChangeListener {
+        @Override
+        public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
+            List<String> instanceRestAddresses = Lists.newArrayList();
+            for (LiveInstance liveInstance : liveInstances) {
+                String instanceName = liveInstance.getInstanceName();
+                int indexOfUnderscore = instanceName.lastIndexOf("_");
+                instanceRestAddresses.add(instanceName.substring(0, indexOfUnderscore) + ":" + instanceName.substring(indexOfUnderscore + 1));
+            }
+            String restServersInCluster = Strings.join(instanceRestAddresses, ",");
+            kylinConfig.setProperty("kylin.rest.servers", restServersInCluster);
+            System.setProperty("kylin.rest.servers", restServersInCluster);
+            logger.info("kylin.rest.servers update to " + restServersInCluster);
+            Broadcaster.clearCache();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
new file mode 100644
index 0000000..6694c81
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.rest.helix;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.lock.MockJobLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> {
+    private static final Logger logger = LoggerFactory.getLogger(LeaderStandbyStateModelFactory.class);
+    
+    @Override
+    public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
+        if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) {
+            return new JobEngineStateModel();
+        }
+        
+        return null;
+    }
+
+    public static class JobEngineStateModel extends TransitionHandler {
+
+        @Transition(to = "LEADER", from = "STANDBY")
+        public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+            logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
+            try {
+                KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+                DefaultScheduler scheduler = DefaultScheduler.createInstance();
+                scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
+                while (!scheduler.hasStarted()) {
+                    logger.error("scheduler has not been started");
+                    Thread.sleep(1000);
+                }
+            } catch (Exception e) {
+                logger.error("error start DefaultScheduler", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Transition(to = "STANDBY", from = "LEADER")
+        public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+            logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()");
+            DefaultScheduler.destroyInstance();
+            
+        }
+
+        @Transition(to = "STANDBY", from = "OFFLINE")
+        public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+            logger.info("JobEngineStateModel.onBecomeStandbyFromOffline()");
+
+        }
+
+
+        @Transition(to = "OFFLINE", from = "STANDBY")
+        public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+            logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()");
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 1feb66f..51f241c 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -55,7 +55,7 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
-import org.apache.kylin.rest.helix.HelixJobEngineAdmin;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
 import org.apache.kylin.rest.request.MetricsRequest;
 import org.apache.kylin.rest.response.HBaseResponse;
 import org.apache.kylin.rest.response.MetricsResponse;
@@ -570,8 +570,8 @@ public class CubeService extends BasicService {
     public void updateOnNewSegmentReady(String cubeName) {
         logger.debug("on updateOnNewSegmentReady: " + cubeName);
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        HelixJobEngineAdmin jobEngineAdmin = HelixJobEngineAdmin.getInstance(kylinConfig.getZookeeperAddress());
-        boolean isLeaderRole = jobEngineAdmin.isLeaderRole(kylinConfig.getClusterName(), HelixJobEngineAdmin.getCurrentInstanceName());
+        HelixClusterAdmin jobEngineAdmin = HelixClusterAdmin.getInstance(kylinConfig);
+        boolean isLeaderRole = jobEngineAdmin.isLeaderRole(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE);
         logger.debug("server is leader role ? " + isLeaderRole);
         if (isLeaderRole == true) {
             keepCubeRetention(cubeName);

http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
new file mode 100644
index 0000000..70525b3
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
@@ -0,0 +1,140 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+* 
+*     http://www.apache.org/licenses/LICENSE-2.0
+* 
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.InputStream;
+
+import static org.apache.kylin.rest.helix.HelixClusterAdmin.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+*/
+public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
+
+    String zkAddress = "localhost:2199";
+    ZkServer server;
+
+    HelixClusterAdmin clusterAdmin1;
+    HelixClusterAdmin clusterAdmin2;
+    KylinConfig kylinConfig;
+
+    private static final String CLUSTER_NAME = "test_cluster";
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        // start zookeeper on localhost
+        final File tmpDir = new File("/tmp/helix-quickstart");
+        FileUtil.fullyDelete(tmpDir);
+        tmpDir.mkdirs();
+        server = new ZkServer("/tmp/helix-quickstart/dataDir", "/tmp/helix-quickstart/logDir", new IDefaultNameSpace() {
+            @Override
+            public void createDefaultNameSpace(ZkClient zkClient) {
+            }
+        }, 2199);
+        server.start();
+
+        kylinConfig = this.getTestConfig();
+        kylinConfig.setRestAddress("localhost:7070");
+        kylinConfig.setZookeeperAddress(zkAddress);
+        kylinConfig.setClusterName(CLUSTER_NAME);
+        
+        final ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkAddress);
+        zkHelixAdmin.dropCluster(kylinConfig.getClusterName());
+
+    }
+
+    @Test
+    public void test() throws Exception {
+        
+        // 1. start one instance
+        clusterAdmin1 = getInstance(kylinConfig);
+        clusterAdmin1.start();
+
+        Thread.sleep(1000);
+        assertTrue(clusterAdmin1.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
+        assertEquals(1, kylinConfig.getRestServers().length);
+        assertEquals("localhost:7070", kylinConfig.getRestServers()[0]);
+        
+        // 2. start second instance
+        InputStream is = IOUtils.toInputStream(kylinConfig.getConfigAsString());
+        KylinConfig kylinConfig2 = KylinConfig.getKylinConfigFromInputStream(is);
+        kylinConfig2.setRestAddress("localhost:7072");
+        is.close();
+
+
+        clusterAdmin2 = getInstance(kylinConfig2);
+        clusterAdmin2.start();
+
+        Thread.sleep(1000);
+        assertTrue(clusterAdmin1.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
+        assertFalse(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
+        assertEquals(2, kylinConfig.getRestServers().length);
+        assertEquals("localhost:7070", kylinConfig.getRestServers()[0]);
+        assertEquals("localhost:7072", kylinConfig.getRestServers()[1]);
+        
+        // 3. shutdown the first instance
+        clusterAdmin1.stop();
+        clusterAdmin1 = null;
+        Thread.sleep(1000);
+        assertTrue(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
+        assertEquals(1, kylinConfig.getRestServers().length);
+        assertEquals("localhost:7072", kylinConfig.getRestServers()[0]);
+        
+        // 4. recover first instance
+        clusterAdmin1 = getInstance(kylinConfig);
+        clusterAdmin1.start();
+
+        Thread.sleep(1000);
+        assertTrue(clusterAdmin1.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
+        assertFalse(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
+        assertEquals(2, kylinConfig.getRestServers().length);
+        assertEquals("localhost:7070", kylinConfig.getRestServers()[0]);
+        assertEquals("localhost:7072", kylinConfig.getRestServers()[1]);
+    }
+
+    @After
+    public void tearDown() {
+        if (clusterAdmin1 != null) {
+            clusterAdmin1.stop();
+        }
+
+        if (clusterAdmin2 != null) {
+            clusterAdmin2.stop();
+        }
+        
+        server.shutdown();
+        cleanupTestMetadata();
+    }
+
+}