You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/25 02:29:54 UTC
[1/2] git commit: [HELIX-364] Ensure participants and controllers
sharing a session id are treated uniquely, rb=17256
Updated Branches:
refs/heads/master 6952c8f00 -> f1ffa8619
[HELIX-364] Ensure participants and controllers sharing a session id are treated uniquely, rb=17256
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5cf39b96
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5cf39b96
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5cf39b96
Branch: refs/heads/master
Commit: 5cf39b965cec3fc72c689d07ddb8642cde7f43a6
Parents: 7fca871
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Jan 23 11:33:57 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jan 24 17:29:21 2014 -0800
----------------------------------------------------------------------
.../controller/GenericHelixController.java | 11 +-
.../stages/ExternalViewComputeStage.java | 13 +-
.../manager/zk/DistributedLeaderElection.java | 64 +-----
.../helix/manager/zk/ZkHelixLeaderElection.java | 21 +-
.../DistClusterControllerElection.java | 74 +------
.../helix/integration/TestSchedulerMessage.java | 1 +
.../helix/integration/TestSharedConnection.java | 199 +++++++++++++++++++
7 files changed, 236 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 28a5b06..e9924a2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -43,6 +43,7 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.SessionId;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
@@ -484,7 +485,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
Map<String, LiveInstance> curSessions = new HashMap<String, LiveInstance>();
for (LiveInstance liveInstance : liveInstances) {
curInstances.put(liveInstance.getInstanceName(), liveInstance);
- curSessions.put(liveInstance.getTypedSessionId().stringify(), liveInstance);
+ curSessions.put(liveInstance.getInstanceName() + "|" + liveInstance.getSessionId(),
+ liveInstance);
}
Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();
@@ -497,7 +499,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
if (!curSessions.containsKey(session)) {
// remove current-state listener for expired session
String instanceName = lastSessions.get(session).getInstanceName();
- manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
+ SessionId sessionId = lastSessions.get(session).getTypedSessionId();
+ manager
+ .removeListener(keyBuilder.currentStates(instanceName, sessionId.toString()), this);
}
}
}
@@ -514,9 +518,10 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
for (String session : curSessions.keySet()) {
if (lastSessions == null || !lastSessions.containsKey(session)) {
String instanceName = curSessions.get(session).getInstanceName();
+ SessionId sessionId = curSessions.get(session).getTypedSessionId();
try {
// add current-state listeners for new sessions
- manager.addCurrentStateChangeListener(this, instanceName, session);
+ manager.addCurrentStateChangeListener(this, instanceName, sessionId.toString());
logger.info(manager.getInstanceName() + " added current-state listener for instance: "
+ instanceName + ", session: " + session + ", listener: " + this);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index a15e6b3..e8e42bf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -46,7 +46,6 @@ import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
@@ -120,12 +119,14 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
ClusterStatusMonitor clusterStatusMonitor =
(ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
Resource currentResource = cluster.getResourceMap().get(view.getResourceId());
- if (currentResource != null) {
+ if (clusterStatusMonitor != null && currentResource != null) {
IdealState idealState = currentResource.getIdealState();
- if (clusterStatusMonitor != null
- && !idealState.getStateModelDefRef().equalsIgnoreCase(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- clusterStatusMonitor.onExternalViewChange(view, idealState);
+ if (idealState != null) {
+ StateModelDefId stateModelDefId = idealState.getStateModelDefId();
+ if (stateModelDefId != null
+ && !stateModelDefId.equals(StateModelDefId.SchedulerTaskQueue)) {
+ clusterStatusMonitor.onExternalViewChange(view, idealState);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index 9836020..86b8d41 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import java.lang.management.ManagementFactory;
import java.util.List;
import org.apache.helix.ControllerChangeListener;
@@ -29,11 +28,7 @@ import org.apache.helix.HelixTimerTask;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyType;
import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
-import org.apache.helix.model.LeaderHistory;
-import org.apache.helix.model.LiveInstance;
import org.apache.log4j.Logger;
/**
@@ -84,12 +79,12 @@ public class DistributedLeaderElection implements ControllerChangeListener {
Builder keyBuilder = accessor.keyBuilder();
while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
- boolean success = tryUpdateController(manager);
+ boolean success = ZkHelixLeaderElection.tryUpdateController(manager);
if (success) {
LOG.info(_manager.getInstanceName() + " acquired leadership for cluster: "
+ _manager.getClusterName());
- updateHistory(manager);
+ ZkHelixLeaderElection.updateHistory(manager);
_manager.getHelixDataAccessor().getBaseDataAccessor().reset();
controllerHelper.addListenersToController(_controller);
controllerHelper.startControllerTimerTasks();
@@ -111,59 +106,4 @@ public class DistributedLeaderElection implements ControllerChangeListener {
LOG.error("Exception when trying to become leader", e);
}
}
-
- private boolean tryUpdateController(HelixManager manager) {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- LiveInstance leader = new LiveInstance(manager.getInstanceName());
- try {
- leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
- leader.setSessionId(manager.getSessionId());
- leader.setHelixVersion(manager.getVersion());
- if (ZKPropertyTransferServer.getInstance() != null) {
- String zkPropertyTransferServiceUrl =
- ZKPropertyTransferServer.getInstance().getWebserviceUrl();
- if (zkPropertyTransferServiceUrl != null) {
- leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
- }
- } else {
- LOG.warn("ZKPropertyTransferServer instnace is null");
- }
- boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
- if (success) {
- return true;
- } else {
- LOG.info("Unable to become leader probably because some other controller becames the leader");
- }
- } catch (Exception e) {
- LOG.error(
- "Exception when trying to updating leader record in cluster:" + manager.getClusterName()
- + ". Need to check again whether leader node has been created or not", e);
- }
-
- leader = accessor.getProperty(keyBuilder.controllerLeader());
- if (leader != null) {
- String leaderSessionId = leader.getTypedSessionId().stringify();
- LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
- + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
-
- if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) {
- return true;
- }
- }
- return false;
- }
-
- private void updateHistory(HelixManager manager) {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
- if (history == null) {
- history = new LeaderHistory(PropertyType.HISTORY.toString());
- }
- history.updateHistory(manager.getClusterName(), manager.getInstanceName());
- accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
index 77da158..cc99b8e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
@@ -22,8 +22,6 @@ package org.apache.helix.manager.zk;
import java.lang.management.ManagementFactory;
import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.HelixConnection;
-import org.apache.helix.HelixController;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
@@ -84,7 +82,6 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
|| changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
LOG.info(_controllerId + " is trying to acquire leadership for cluster: " + _clusterId);
-
while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
boolean success = tryUpdateController(_manager);
if (success) {
@@ -122,7 +119,12 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
}
}
- private boolean tryUpdateController(HelixManager manager) {
+ /**
+ * Try to become the leader controller
+ * @param manager a live helix manager connection
+ * @return true if this controller has been elected the leader, false otherwise
+ */
+ public static boolean tryUpdateController(HelixManager manager) {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -155,17 +157,22 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
leader = accessor.getProperty(keyBuilder.controllerLeader());
if (leader != null) {
String leaderSessionId = leader.getSessionId();
+ String leaderId = leader.getId();
LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
+ leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
-
- if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) {
+ if (leaderId != null && leaderId.equals(manager.getInstanceName()) && leaderSessionId != null
+ && leaderSessionId.equals(manager.getSessionId())) {
return true;
}
}
return false;
}
- private void updateHistory(HelixManager manager) {
+ /**
+ * Update the history with this controller as the most recent leader
+ * @param manager active helix manager connection
+ */
+ public static void updateHistory(HelixManager manager) {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
index 45bee64..ee7efcd 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
@@ -19,8 +19,6 @@ package org.apache.helix.participant;
* under the License.
*/
-import java.lang.management.ManagementFactory;
-
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -28,12 +26,9 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyType;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
-import org.apache.helix.model.LeaderHistory;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.manager.zk.ZkHelixLeaderElection;
import org.apache.log4j.Logger;
// TODO: merge with GenericHelixController
@@ -75,9 +70,9 @@ public class DistClusterControllerElection implements ControllerChangeListener {
Builder keyBuilder = accessor.keyBuilder();
while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
- boolean success = tryUpdateController(manager);
+ boolean success = ZkHelixLeaderElection.tryUpdateController(manager);
if (success) {
- updateHistory(manager);
+ ZkHelixLeaderElection.updateHistory(manager);
if (type == InstanceType.CONTROLLER) {
HelixControllerMain.addListenersToController(manager, _controller);
manager.startTimerTasks();
@@ -95,9 +90,8 @@ public class DistClusterControllerElection implements ControllerChangeListener {
}
}
- }
- else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
- if(_leader != null) {
+ } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
+ if (_leader != null) {
_leader.disconnect();
}
_controller.shutdownClusterStatusMonitor(manager.getClusterName());
@@ -106,62 +100,4 @@ public class DistClusterControllerElection implements ControllerChangeListener {
LOG.error("Exception when trying to become leader", e);
}
}
-
- private boolean tryUpdateController(HelixManager manager) {
- // DataAccessor dataAccessor = manager.getDataAccessor();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- LiveInstance leader = new LiveInstance(manager.getInstanceName());
- try {
- leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
- // TODO: this session id is not the leader's session id in
- // distributed mode
- leader.setSessionId(manager.getSessionId());
- leader.setHelixVersion(manager.getVersion());
- if (ZKPropertyTransferServer.getInstance() != null) {
- String zkPropertyTransferServiceUrl =
- ZKPropertyTransferServer.getInstance().getWebserviceUrl();
- if (zkPropertyTransferServiceUrl != null) {
- leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
- }
- } else {
- LOG.warn("ZKPropertyTransferServer instnace is null");
- }
- boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
- if (success) {
- return true;
- } else {
- LOG.info("Unable to become leader probably because some other controller becames the leader");
- }
- } catch (Exception e) {
- LOG.error(
- "Exception when trying to updating leader record in cluster:" + manager.getClusterName()
- + ". Need to check again whether leader node has been created or not", e);
- }
-
- leader = accessor.getProperty(keyBuilder.controllerLeader());
- if (leader != null) {
- String leaderSessionId = leader.getTypedSessionId().stringify();
- LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
- + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
-
- if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) {
- return true;
- }
- }
- return false;
- }
-
- private void updateHistory(HelixManager manager) {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
- if (history == null) {
- history = new LeaderHistory(PropertyType.HISTORY.toString());
- }
- history.updateHistory(manager.getClusterName(), manager.getInstanceName());
- accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 6066859..d78bd9d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -725,6 +725,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
@Test()
public void testSchedulerMsg4() throws Exception {
_factory._results.clear();
+ Thread.sleep(2000);
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
_participants[i].getMessagingService().registerMessageHandlerFactory(
http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java
new file mode 100644
index 0000000..bf89cdb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java
@@ -0,0 +1,199 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.manager.zk.ZkHelixLeaderElection;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Ensure that the external view is able to update properly when participants share a connection.
+ */
+public class TestSharedConnection extends ZkUnitTestBase {
+ /**
+ * Ensure that the external view is able to update properly when participants share a connection.
+ */
+ @Test
+ public void testSharedParticipantConnection() throws Exception {
+ final int NUM_PARTICIPANTS = 2;
+ final int NUM_PARTITIONS = 4;
+ final int NUM_REPLICAS = 2;
+ final String RESOURCE_NAME = "TestDB0";
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_PARTICIPANTS, // number of nodes
+ NUM_REPLICAS, // replicas
+ "OnlineOffline", RebalanceMode.CUSTOMIZED, true); // do rebalance
+
+ // Connect
+ HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+ connection.connect();
+
+ // Start some participants
+ HelixParticipant[] participants = new HelixParticipant[NUM_PARTICIPANTS];
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ participants[i] =
+ connection.createParticipant(ClusterId.from(clusterName),
+ ParticipantId.from("localhost_" + (12918 + i)));
+ participants[i].getStateMachineEngine().registerStateModelFactory(
+ StateModelDefId.from("OnlineOffline"), new TestHelixConnection.MockStateModelFactory());
+ participants[i].startAsync();
+ }
+
+ // Start the controller
+ HelixController controller =
+ connection.createController(ClusterId.from(clusterName), ControllerId.from("controller"));
+ controller.startAsync();
+ Thread.sleep(500);
+
+ // Verify balanced cluster
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Drop a partition from the first participant
+ HelixAdmin admin = connection.createClusterManagementTool();
+ IdealState idealState = admin.getResourceIdealState(clusterName, RESOURCE_NAME);
+ Map<ParticipantId, State> participantStateMap =
+ idealState.getParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_0"));
+ participantStateMap.remove(ParticipantId.from("localhost_12918"));
+ idealState.setParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_0"), participantStateMap);
+ admin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
+ Thread.sleep(1000);
+
+ // Verify balanced cluster
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Drop a partition from the second participant
+ participantStateMap = idealState.getParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_1"));
+ participantStateMap.remove(ParticipantId.from("localhost_12919"));
+ idealState.setParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_1"), participantStateMap);
+ admin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
+ Thread.sleep(1000);
+
+ // Verify balanced cluster
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Clean up
+ controller.stopAsync();
+ for (HelixParticipant participant : participants) {
+ participant.stopAsync();
+ }
+ admin.dropCluster(clusterName);
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ /**
+ * Ensure that only one controller with a shared connection thinks it's leader
+ */
+ @Test
+ public void testSharedControllerConnection() throws Exception {
+ final int NUM_PARTICIPANTS = 2;
+ final int NUM_PARTITIONS = 4;
+ final int NUM_REPLICAS = 2;
+ final int NUM_CONTROLLERS = 2;
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_PARTICIPANTS, // number of nodes
+ NUM_REPLICAS, // replicas
+ "OnlineOffline", RebalanceMode.CUSTOMIZED, true); // do rebalance
+
+ // Connect
+ HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+ connection.connect();
+
+ // Create a couple controllers
+ HelixController[] controllers = new HelixController[NUM_CONTROLLERS];
+ for (int i = 0; i < NUM_CONTROLLERS; i++) {
+ controllers[i] =
+ connection.createController(ClusterId.from(clusterName),
+ ControllerId.from("controller_" + i));
+ controllers[i].startAsync();
+ }
+ Thread.sleep(1000);
+
+ // Now verify that exactly one is leader
+ int leaderCount = 0;
+ for (HelixController controller : controllers) {
+ HelixConnectionAdaptor adaptor = new HelixConnectionAdaptor(controller);
+ boolean result = ZkHelixLeaderElection.tryUpdateController(adaptor);
+ if (result) {
+ leaderCount++;
+ }
+ }
+ Assert.assertEquals(leaderCount, 1);
+
+ // Clean up
+ for (HelixController controller : controllers) {
+ controller.stopAsync();
+ }
+ HelixAdmin admin = connection.createClusterManagementTool();
+ admin.dropCluster(clusterName);
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
[2/2] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/helix
Posted by ka...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/helix
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f1ffa861
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f1ffa861
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f1ffa861
Branch: refs/heads/master
Commit: f1ffa8619ef21b7521d73b22348399ef79d90cca
Parents: 5cf39b9 6952c8f
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Jan 24 17:29:38 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jan 24 17:29:38 2014 -0800
----------------------------------------------------------------------
.../src/site/markdown/Tutorial.md | 1 +
.../src/site/markdown/tutorial_agent.md | 169 +++++++++++++++++++
.../src/site/markdown/Tutorial.md | 1 +
.../src/site/markdown/tutorial_agent.md | 169 +++++++++++++++++++
.../src/site/markdown/Tutorial.md | 1 +
.../src/site/markdown/tutorial_agent.md | 169 +++++++++++++++++++
website/trunk/src/site/markdown/Tutorial.md | 1 +
.../trunk/src/site/markdown/tutorial_agent.md | 169 +++++++++++++++++++
8 files changed, 680 insertions(+)
----------------------------------------------------------------------