You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/14 03:53:21 UTC
[05/15] kylin git commit: KYLIN-1188 use helix 0.7.1 to manage the
job engine assignment
KYLIN-1188 use helix 0.7.1 to manage the job engine assignment
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ceec8980
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ceec8980
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ceec8980
Branch: refs/heads/helix-201602
Commit: ceec8980b5a02a5f2e2f5f8f8f34ded4b708a638
Parents: b26d957
Author: shaofengshi <sh...@apache.org>
Authored: Tue Jan 12 15:07:25 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Feb 6 13:31:49 2016 +0800
----------------------------------------------------------------------
build/conf/kylin.properties | 16 +-
.../apache/kylin/common/KylinConfigBase.java | 28 +++
.../test_case_data/sandbox/kylin.properties | 2 +
pom.xml | 1 +
.../kylin/rest/controller/JobController.java | 33 +--
.../kylin/rest/helix/HelixClusterAdmin.java | 245 +++++++++++++++++++
.../helix/LeaderStandbyStateModelFactory.java | 70 ++++++
.../apache/kylin/rest/service/CubeService.java | 6 +-
.../kylin/rest/helix/HelixClusterAdminTest.java | 140 +++++++++++
9 files changed, 516 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 44a282e..8456ecb 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -1,12 +1,24 @@
-## Config for Kylin Engine ##
+## Cluster related properties ##
+# Required, comma separated list of zk servers;
+kylin.zookeeper.address=
+# rest address of this instance, ;
+# optional, default be <hostname>:7070
+kylin.rest.address=
+
+# whether run a cluster controller in this node
+kylin.cluster.controller=true
# optional information for the owner of kylin platform, it can be your team's email
# currently it will be attached to each kylin's htable attribute
kylin.owner=whoami@kylin.apache.org
# List of web servers in use, this enables one web server instance to sync up with other servers.
-kylin.rest.servers=localhost:7070
+# Deprecated, cluster will self-discover and update this.
+# kylin.rest.servers=localhost:7070
+
+# Server mode: all, job, query
+kylin.server.mode=all
# The metadata store in hbase
kylin.metadata.url=kylin_metadata@hbase
http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 5ce4ddc..a36b977 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -530,6 +530,34 @@ public class KylinConfigBase implements Serializable {
return getOptional("mail.sender", "");
}
+ public String getZookeeperAddress() {
+ return this.getOptional("kylin.zookeeper.address");
+ }
+
+ public void setZookeeperAddress(String zkAddress) {
+ setProperty("kylin.zookeeper.address", zkAddress);
+ }
+
+ public String getClusterName() {
+ return this.getOptional("kylin.cluster.name", getMetadataUrlPrefix());
+ }
+
+ public void setClusterName(String clusterName) {
+ setProperty("kylin.cluster.name", clusterName);
+ }
+
+ public boolean isClusterController() {
+ return Boolean.parseBoolean(getOptional("kylin.cluster.controller", "true"));
+ }
+
+ public String getRestAddress() {
+ return this.getOptional("kylin.rest.address");
+ }
+
+ public void setRestAddress(String restAddress) {
+ setProperty("kylin.rest.address", restAddress);
+ }
+
public String toString() {
return getMetadataUrl();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 18ff1cc..5ce636b 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -11,6 +11,8 @@ kylin.rest.servers=localhost:7070
#set display timezone on UI,format like[GMT+N or GMT-N]
kylin.rest.timezone=GMT-8
+kylin.server.mode=all
+>>>>>>> KYLIN-1188 use helix 0.7.1 to manage the job engine assignment
# The metadata store in hbase
kylin.metadata.url=kylin_metadata@hbase
http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 21fd8aa..8f04dcd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,7 @@
org/apache/kylin/**/tools/**:**/*CLI.java
</sonar.jacoco.excludes>
+ <helix.version>0.7.1</helix.version>
</properties>
<licenses>
http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 9dfb594..741b5ee 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -26,15 +26,13 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
-import com.google.common.collect.Lists;
-import joptsimple.internal.Strings;
+import com.google.common.base.Preconditions;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobTimeFilterEnum;
import org.apache.kylin.rest.exception.InternalErrorException;
-import org.apache.kylin.rest.helix.HelixJobEngineAdmin;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
import org.apache.kylin.rest.request.JobListRequest;
import org.apache.kylin.rest.service.JobService;
import org.slf4j.Logger;
@@ -51,8 +49,6 @@ import java.io.IOException;
import java.util.*;
/**
- * @author ysong1
- * @author Jack
*
*/
@Controller
@@ -76,9 +72,19 @@ public class JobController extends BasicController implements InitializingBean {
TimeZone tzone = TimeZone.getTimeZone(timeZone);
TimeZone.setDefault(tzone);
- final String instanceName = HelixJobEngineAdmin.getCurrentInstanceName();
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ Preconditions.checkNotNull(kylinConfig.getZookeeperAddress(), "'kylin.zookeeper.address' couldn't be null, set it in kylin.properties.");
+ final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig);
+ clusterAdmin.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ clusterAdmin.stop();
+ }
+ }));
+
}
/**
@@ -190,17 +196,4 @@ public class JobController extends BasicController implements InitializingBean {
this.jobService = jobService;
}
- private void updateKylinCluster(List<String> instances) {
- List<String> instanceRestAddresses = Lists.newArrayList();
- for (String instanceName : instances) {
- int indexOfUnderscore = instanceName.lastIndexOf("_");
- instanceRestAddresses.add(instanceName.substring(0, indexOfUnderscore) + ":" + instanceName.substring(indexOfUnderscore + 1));
- }
- String restServersInCluster = Strings.join(instanceRestAddresses, ",");
- KylinConfig.getInstanceFromEnv().setProperty("kylin.rest.servers", restServersInCluster);
- System.setProperty("kylin.rest.servers", restServersInCluster);
- Broadcaster.clearCache();
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
new file mode 100644
index 0000000..9983aae
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import joptsimple.internal.Strings;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.*;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.*;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.rest.constant.Constant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Administrator of Kylin cluster
+ */
+public class HelixClusterAdmin {
+
+ public static final String RESOURCE_NAME_JOB_ENGINE = "Resource_JobEngine";
+
+ public static final String MODEL_LEADER_STANDBY = "LeaderStandby";
+ public static final String MODEL_ONLINE_OFFLINE = "OnlineOffline";
+ public static final String TAG_JOB_ENGINE = "Tag_JobEngine";
+
+ private static ConcurrentMap<KylinConfig, HelixClusterAdmin> instanceMaps = Maps.newConcurrentMap();
+ private HelixManager participantManager;
+ private HelixManager controllerManager;
+
+ private final KylinConfig kylinConfig;
+
+ private static final Logger logger = LoggerFactory.getLogger(HelixClusterAdmin.class);
+ private final String zkAddress;
+ private final ZKHelixAdmin admin;
+ private final String clusterName;
+
+ private HelixClusterAdmin(KylinConfig kylinConfig) {
+ this.kylinConfig = kylinConfig;
+ this.zkAddress = kylinConfig.getZookeeperAddress();
+ this.clusterName = kylinConfig.getClusterName();
+ this.admin = new ZKHelixAdmin(zkAddress);
+ }
+
+ public void start() throws Exception {
+ initCluster();
+ final String instanceName = getCurrentInstanceName();
+
+ // use the tag to mark node's role.
+ final List<String> instanceTags = Lists.newArrayList();
+ final boolean runJobEngine = Constant.SERVER_MODE_ALL.equalsIgnoreCase(kylinConfig.getServerMode()) || Constant.SERVER_MODE_JOB.equalsIgnoreCase(kylinConfig.getServerMode());
+ if (runJobEngine) {
+ instanceTags.add(HelixClusterAdmin.TAG_JOB_ENGINE);
+ }
+
+ addInstance(instanceName, instanceTags);
+ startInstance(instanceName);
+
+ rebalanceWithTag(instanceTags);
+
+ boolean startController = kylinConfig.isClusterController();
+ if (startController) {
+ startController();
+ }
+ }
+
+ /**
+ * Initiate the cluster, adding state model definitions and resource definitions
+ */
+ protected void initCluster() {
+ admin.addCluster(clusterName, false);
+ if (admin.getStateModelDef(clusterName, MODEL_ONLINE_OFFLINE) == null) {
+ admin.addStateModelDef(clusterName, MODEL_ONLINE_OFFLINE, new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()));
+ }
+ if (admin.getStateModelDef(clusterName, MODEL_LEADER_STANDBY) == null) {
+ admin.addStateModelDef(clusterName, MODEL_LEADER_STANDBY, new StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby()));
+ }
+
+ // add job engine as a resource, 1 partition
+ if (!admin.getResourcesInCluster(clusterName).contains(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE)) {
+ admin.addResource(clusterName, HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
+ }
+
+ }
+
+ /**
+ * Start the instance and register the state model factory
+ * @param instanceName
+ * @throws Exception
+ */
+ protected void startInstance(String instanceName) throws Exception {
+ participantManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddress);
+ participantManager.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(MODEL_LEADER_STANDBY), new LeaderStandbyStateModelFactory());
+ participantManager.connect();
+ participantManager.addLiveInstanceChangeListener(new KylinClusterLiveInstanceChangeListener());
+
+ }
+
+ /**
+ * Rebalance the resource with the tags
+ * @param tags
+ */
+ protected void rebalanceWithTag(List<String> tags) {
+ for (String tag : tags) {
+ if (tag.equals(TAG_JOB_ENGINE)) {
+ List<String> instances = admin.getInstancesInClusterWithTag(clusterName, TAG_JOB_ENGINE);
+ admin.rebalance(clusterName, RESOURCE_NAME_JOB_ENGINE, instances.size(), "", tag);
+ }
+ }
+ }
+
+ /**
+ * Start an embedded helix controller
+ */
+ protected void startController() {
+ controllerManager = HelixControllerMain.startHelixController(zkAddress, clusterName, "controller", HelixControllerMain.STANDALONE);
+ }
+
+ public void stop() {
+ if (participantManager != null) {
+ participantManager.disconnect();
+ }
+
+ if (controllerManager != null) {
+ controllerManager.disconnect();
+ }
+ }
+
+ public String getInstanceState(String resourceName) {
+ String instanceName = this.getCurrentInstanceName();
+ final ExternalView resourceExternalView = admin.getResourceExternalView(clusterName, resourceName);
+ if (resourceExternalView == null) {
+ logger.warn("fail to get ExternalView, clusterName:" + clusterName + " resourceName:" + resourceName);
+ return "ERROR";
+ }
+ final Set<String> partitionSet = resourceExternalView.getPartitionSet();
+ final Map<String, String> stateMap = resourceExternalView.getStateMap(partitionSet.iterator().next());
+ if (stateMap.containsKey(instanceName)) {
+ return stateMap.get(instanceName);
+ } else {
+ logger.warn("fail to get state, clusterName:" + clusterName + " resourceName:" + resourceName + " instance:" + instanceName);
+ return "ERROR";
+ }
+ }
+
+ /**
+ * Check whether current kylin instance is in the leader role
+ * @return
+ */
+ public boolean isLeaderRole(String resourceName) {
+ final String instanceState = getInstanceState(resourceName);
+ logger.debug("instance state: " + instanceState);
+ if ("LEADER".equalsIgnoreCase(instanceState)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Add instance to cluster, with a tag list
+ * @param instanceName should be unique in format: hostName_port
+ * @param tags
+ */
+ public void addInstance(String instanceName, List<String> tags) {
+ final String hostname = instanceName.substring(0, instanceName.lastIndexOf("_"));
+ final String port = instanceName.substring(instanceName.lastIndexOf("_") + 1);
+ InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+ instanceConfig.setHostName(hostname);
+ instanceConfig.setPort(port);
+ if (tags != null) {
+ for (String tag : tags) {
+ instanceConfig.addTag(tag);
+ }
+ }
+
+ if (admin.getInstancesInCluster(clusterName).contains(instanceName)) {
+ admin.dropInstance(clusterName, instanceConfig);
+ }
+ admin.addInstance(clusterName, instanceConfig);
+ }
+
+ public static HelixClusterAdmin getInstance(KylinConfig kylinConfig) {
+ Preconditions.checkNotNull(kylinConfig);
+ instanceMaps.putIfAbsent(kylinConfig, new HelixClusterAdmin(kylinConfig));
+ return instanceMaps.get(kylinConfig);
+ }
+
+ public String getCurrentInstanceName() {
+ final String restAddress = kylinConfig.getRestAddress();
+ if (StringUtils.isEmpty(restAddress)) {
+ throw new RuntimeException("There is no kylin.rest.address set in System property and kylin.properties;");
+ }
+
+ final String hostname = Preconditions.checkNotNull(restAddress.substring(0, restAddress.lastIndexOf(":")), "failed to get HostName of this server");
+ final String port = Preconditions.checkNotNull(restAddress.substring(restAddress.lastIndexOf(":") + 1), "failed to get port of this server");
+ return hostname + "_" + port;
+ }
+
+ /**
+ * Listen to the cluster's event, update "kylin.rest.servers" to the live instances.
+ */
+ class KylinClusterLiveInstanceChangeListener implements LiveInstanceChangeListener {
+ @Override
+ public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
+ List<String> instanceRestAddresses = Lists.newArrayList();
+ for (LiveInstance liveInstance : liveInstances) {
+ String instanceName = liveInstance.getInstanceName();
+ int indexOfUnderscore = instanceName.lastIndexOf("_");
+ instanceRestAddresses.add(instanceName.substring(0, indexOfUnderscore) + ":" + instanceName.substring(indexOfUnderscore + 1));
+ }
+ String restServersInCluster = Strings.join(instanceRestAddresses, ",");
+ kylinConfig.setProperty("kylin.rest.servers", restServersInCluster);
+ System.setProperty("kylin.rest.servers", restServersInCluster);
+ logger.info("kylin.rest.servers update to " + restServersInCluster);
+ Broadcaster.clearCache();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
new file mode 100644
index 0000000..6694c81
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.rest.helix;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.lock.MockJobLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> {
+ private static final Logger logger = LoggerFactory.getLogger(LeaderStandbyStateModelFactory.class);
+
+ @Override
+ public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
+ if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) {
+ return new JobEngineStateModel();
+ }
+
+ return null;
+ }
+
+ public static class JobEngineStateModel extends TransitionHandler {
+
+ @Transition(to = "LEADER", from = "STANDBY")
+ public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+ logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
+ try {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ DefaultScheduler scheduler = DefaultScheduler.createInstance();
+ scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
+ while (!scheduler.hasStarted()) {
+ logger.error("scheduler has not been started");
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ logger.error("error start DefaultScheduler", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Transition(to = "STANDBY", from = "LEADER")
+ public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+ logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()");
+ DefaultScheduler.destroyInstance();
+
+ }
+
+ @Transition(to = "STANDBY", from = "OFFLINE")
+ public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+ logger.info("JobEngineStateModel.onBecomeStandbyFromOffline()");
+
+ }
+
+
+ @Transition(to = "OFFLINE", from = "STANDBY")
+ public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+ logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()");
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 1feb66f..51f241c 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -55,7 +55,7 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
-import org.apache.kylin.rest.helix.HelixJobEngineAdmin;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
import org.apache.kylin.rest.request.MetricsRequest;
import org.apache.kylin.rest.response.HBaseResponse;
import org.apache.kylin.rest.response.MetricsResponse;
@@ -570,8 +570,8 @@ public class CubeService extends BasicService {
public void updateOnNewSegmentReady(String cubeName) {
logger.debug("on updateOnNewSegmentReady: " + cubeName);
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- HelixJobEngineAdmin jobEngineAdmin = HelixJobEngineAdmin.getInstance(kylinConfig.getZookeeperAddress());
- boolean isLeaderRole = jobEngineAdmin.isLeaderRole(kylinConfig.getClusterName(), HelixJobEngineAdmin.getCurrentInstanceName());
+ HelixClusterAdmin jobEngineAdmin = HelixClusterAdmin.getInstance(kylinConfig);
+ boolean isLeaderRole = jobEngineAdmin.isLeaderRole(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE);
logger.debug("server is leader role ? " + isLeaderRole);
if (isLeaderRole == true) {
keepCubeRetention(cubeName);
http://git-wip-us.apache.org/repos/asf/kylin/blob/ceec8980/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
new file mode 100644
index 0000000..70525b3
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
@@ -0,0 +1,140 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.InputStream;
+
+import static org.apache.kylin.rest.helix.HelixClusterAdmin.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+*/
+public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
+
+ String zkAddress = "localhost:2199";
+ ZkServer server;
+
+ HelixClusterAdmin clusterAdmin1;
+ HelixClusterAdmin clusterAdmin2;
+ KylinConfig kylinConfig;
+
+ private static final String CLUSTER_NAME = "test_cluster";
+
+ @Before
+ public void setup() throws Exception {
+ createTestMetadata();
+ // start zookeeper on localhost
+ final File tmpDir = new File("/tmp/helix-quickstart");
+ FileUtil.fullyDelete(tmpDir);
+ tmpDir.mkdirs();
+ server = new ZkServer("/tmp/helix-quickstart/dataDir", "/tmp/helix-quickstart/logDir", new IDefaultNameSpace() {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+ }
+ }, 2199);
+ server.start();
+
+ kylinConfig = this.getTestConfig();
+ kylinConfig.setRestAddress("localhost:7070");
+ kylinConfig.setZookeeperAddress(zkAddress);
+ kylinConfig.setClusterName(CLUSTER_NAME);
+
+ final ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkAddress);
+ zkHelixAdmin.dropCluster(kylinConfig.getClusterName());
+
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ // 1. start one instance
+ clusterAdmin1 = getInstance(kylinConfig);
+ clusterAdmin1.start();
+
+ Thread.sleep(1000);
+ assertTrue(clusterAdmin1.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
+ assertEquals(1, kylinConfig.getRestServers().length);
+ assertEquals("localhost:7070", kylinConfig.getRestServers()[0]);
+
+ // 2. start second instance
+ InputStream is = IOUtils.toInputStream(kylinConfig.getConfigAsString());
+ KylinConfig kylinConfig2 = KylinConfig.getKylinConfigFromInputStream(is);
+ kylinConfig2.setRestAddress("localhost:7072");
+ is.close();
+
+
+ clusterAdmin2 = getInstance(kylinConfig2);
+ clusterAdmin2.start();
+
+ Thread.sleep(1000);
+ assertTrue(clusterAdmin1.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
+ assertFalse(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
+ assertEquals(2, kylinConfig.getRestServers().length);
+ assertEquals("localhost:7070", kylinConfig.getRestServers()[0]);
+ assertEquals("localhost:7072", kylinConfig.getRestServers()[1]);
+
+ // 3. shutdown the first instance
+ clusterAdmin1.stop();
+ clusterAdmin1 = null;
+ Thread.sleep(1000);
+ assertTrue(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
+ assertEquals(1, kylinConfig.getRestServers().length);
+ assertEquals("localhost:7072", kylinConfig.getRestServers()[0]);
+
+ // 4. recover first instance
+ clusterAdmin1 = getInstance(kylinConfig);
+ clusterAdmin1.start();
+
+ Thread.sleep(1000);
+ assertTrue(clusterAdmin1.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
+ assertFalse(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
+ assertEquals(2, kylinConfig.getRestServers().length);
+ assertEquals("localhost:7070", kylinConfig.getRestServers()[0]);
+ assertEquals("localhost:7072", kylinConfig.getRestServers()[1]);
+ }
+
+ @After
+ public void tearDown() {
+ if (clusterAdmin1 != null) {
+ clusterAdmin1.stop();
+ }
+
+ if (clusterAdmin2 != null) {
+ clusterAdmin2.stop();
+ }
+
+ server.shutdown();
+ cleanupTestMetadata();
+ }
+
+}