You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/25 02:29:54 UTC

[1/2] git commit: [HELIX-364] Ensure participants and controllers sharing a session id are treated uniquely, rb=17256

Updated Branches:
  refs/heads/master 6952c8f00 -> f1ffa8619


[HELIX-364] Ensure participants and controllers sharing a session id are treated uniquely, rb=17256


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5cf39b96
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5cf39b96
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5cf39b96

Branch: refs/heads/master
Commit: 5cf39b965cec3fc72c689d07ddb8642cde7f43a6
Parents: 7fca871
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Jan 23 11:33:57 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jan 24 17:29:21 2014 -0800

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  11 +-
 .../stages/ExternalViewComputeStage.java        |  13 +-
 .../manager/zk/DistributedLeaderElection.java   |  64 +-----
 .../helix/manager/zk/ZkHelixLeaderElection.java |  21 +-
 .../DistClusterControllerElection.java          |  74 +------
 .../helix/integration/TestSchedulerMessage.java |   1 +
 .../helix/integration/TestSharedConnection.java | 199 +++++++++++++++++++
 7 files changed, 236 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 28a5b06..e9924a2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -43,6 +43,7 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.SessionId;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
@@ -484,7 +485,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     Map<String, LiveInstance> curSessions = new HashMap<String, LiveInstance>();
     for (LiveInstance liveInstance : liveInstances) {
       curInstances.put(liveInstance.getInstanceName(), liveInstance);
-      curSessions.put(liveInstance.getTypedSessionId().stringify(), liveInstance);
+      curSessions.put(liveInstance.getInstanceName() + "|" + liveInstance.getSessionId(),
+          liveInstance);
     }
 
     Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();
@@ -497,7 +499,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
         if (!curSessions.containsKey(session)) {
           // remove current-state listener for expired session
           String instanceName = lastSessions.get(session).getInstanceName();
-          manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
+          SessionId sessionId = lastSessions.get(session).getTypedSessionId();
+          manager
+              .removeListener(keyBuilder.currentStates(instanceName, sessionId.toString()), this);
         }
       }
     }
@@ -514,9 +518,10 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     for (String session : curSessions.keySet()) {
       if (lastSessions == null || !lastSessions.containsKey(session)) {
         String instanceName = curSessions.get(session).getInstanceName();
+        SessionId sessionId = curSessions.get(session).getTypedSessionId();
         try {
           // add current-state listeners for new sessions
-          manager.addCurrentStateChangeListener(this, instanceName, session);
+          manager.addCurrentStateChangeListener(this, instanceName, sessionId.toString());
           logger.info(manager.getInstanceName() + " added current-state listener for instance: "
               + instanceName + ", session: " + session + ", listener: " + this);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index a15e6b3..e8e42bf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -46,7 +46,6 @@ import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
@@ -120,12 +119,14 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
       ClusterStatusMonitor clusterStatusMonitor =
           (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
       Resource currentResource = cluster.getResourceMap().get(view.getResourceId());
-      if (currentResource != null) {
+      if (clusterStatusMonitor != null && currentResource != null) {
         IdealState idealState = currentResource.getIdealState();
-        if (clusterStatusMonitor != null
-            && !idealState.getStateModelDefRef().equalsIgnoreCase(
-                DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-          clusterStatusMonitor.onExternalViewChange(view, idealState);
+        if (idealState != null) {
+          StateModelDefId stateModelDefId = idealState.getStateModelDefId();
+          if (stateModelDefId != null
+              && !stateModelDefId.equals(StateModelDefId.SchedulerTaskQueue)) {
+            clusterStatusMonitor.onExternalViewChange(view, idealState);
+          }
         }
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index 9836020..86b8d41 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import java.lang.management.ManagementFactory;
 import java.util.List;
 
 import org.apache.helix.ControllerChangeListener;
@@ -29,11 +28,7 @@ import org.apache.helix.HelixTimerTask;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyType;
 import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
-import org.apache.helix.model.LeaderHistory;
-import org.apache.helix.model.LiveInstance;
 import org.apache.log4j.Logger;
 
 /**
@@ -84,12 +79,12 @@ public class DistributedLeaderElection implements ControllerChangeListener {
         Builder keyBuilder = accessor.keyBuilder();
 
         while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
-          boolean success = tryUpdateController(manager);
+          boolean success = ZkHelixLeaderElection.tryUpdateController(manager);
           if (success) {
             LOG.info(_manager.getInstanceName() + " acquired leadership for cluster: "
                 + _manager.getClusterName());
 
-            updateHistory(manager);
+            ZkHelixLeaderElection.updateHistory(manager);
             _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
             controllerHelper.addListenersToController(_controller);
             controllerHelper.startControllerTimerTasks();
@@ -111,59 +106,4 @@ public class DistributedLeaderElection implements ControllerChangeListener {
       LOG.error("Exception when trying to become leader", e);
     }
   }
-
-  private boolean tryUpdateController(HelixManager manager) {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    LiveInstance leader = new LiveInstance(manager.getInstanceName());
-    try {
-      leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
-      leader.setSessionId(manager.getSessionId());
-      leader.setHelixVersion(manager.getVersion());
-      if (ZKPropertyTransferServer.getInstance() != null) {
-        String zkPropertyTransferServiceUrl =
-            ZKPropertyTransferServer.getInstance().getWebserviceUrl();
-        if (zkPropertyTransferServiceUrl != null) {
-          leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
-        }
-      } else {
-        LOG.warn("ZKPropertyTransferServer instnace is null");
-      }
-      boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
-      if (success) {
-        return true;
-      } else {
-        LOG.info("Unable to become leader probably because some other controller becames the leader");
-      }
-    } catch (Exception e) {
-      LOG.error(
-          "Exception when trying to updating leader record in cluster:" + manager.getClusterName()
-              + ". Need to check again whether leader node has been created or not", e);
-    }
-
-    leader = accessor.getProperty(keyBuilder.controllerLeader());
-    if (leader != null) {
-      String leaderSessionId = leader.getTypedSessionId().stringify();
-      LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
-          + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
-
-      if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void updateHistory(HelixManager manager) {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
-    if (history == null) {
-      history = new LeaderHistory(PropertyType.HISTORY.toString());
-    }
-    history.updateHistory(manager.getClusterName(), manager.getInstanceName());
-    accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
index 77da158..cc99b8e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
@@ -22,8 +22,6 @@ package org.apache.helix.manager.zk;
 import java.lang.management.ManagementFactory;
 
 import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.HelixConnection;
-import org.apache.helix.HelixController;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
@@ -84,7 +82,6 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
           || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
         LOG.info(_controllerId + " is trying to acquire leadership for cluster: " + _clusterId);
 
-
         while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
           boolean success = tryUpdateController(_manager);
           if (success) {
@@ -122,7 +119,12 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
     }
   }
 
-  private boolean tryUpdateController(HelixManager manager) {
+  /**
+   * Try to become the leader controller
+   * @param manager a live helix manager connection
+   * @return true if this controller has been elected the leader, false otherwise
+   */
+  public static boolean tryUpdateController(HelixManager manager) {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
@@ -155,17 +157,22 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
     leader = accessor.getProperty(keyBuilder.controllerLeader());
     if (leader != null) {
       String leaderSessionId = leader.getSessionId();
+      String leaderId = leader.getId();
       LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
           + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
-
-      if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) {
+      if (leaderId != null && leaderId.equals(manager.getInstanceName()) && leaderSessionId != null
+          && leaderSessionId.equals(manager.getSessionId())) {
         return true;
       }
     }
     return false;
   }
 
-  private void updateHistory(HelixManager manager) {
+  /**
+   * Update the history with this controller as the most recent leader
+   * @param manager active helix manager connection
+   */
+  public static void updateHistory(HelixManager manager) {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
index 45bee64..ee7efcd 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
@@ -19,8 +19,6 @@ package org.apache.helix.participant;
  * under the License.
  */
 
-import java.lang.management.ManagementFactory;
-
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -28,12 +26,9 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyType;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
-import org.apache.helix.model.LeaderHistory;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.manager.zk.ZkHelixLeaderElection;
 import org.apache.log4j.Logger;
 
 // TODO: merge with GenericHelixController
@@ -75,9 +70,9 @@ public class DistClusterControllerElection implements ControllerChangeListener {
         Builder keyBuilder = accessor.keyBuilder();
 
         while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
-          boolean success = tryUpdateController(manager);
+          boolean success = ZkHelixLeaderElection.tryUpdateController(manager);
           if (success) {
-            updateHistory(manager);
+            ZkHelixLeaderElection.updateHistory(manager);
             if (type == InstanceType.CONTROLLER) {
               HelixControllerMain.addListenersToController(manager, _controller);
               manager.startTimerTasks();
@@ -95,9 +90,8 @@ public class DistClusterControllerElection implements ControllerChangeListener {
 
           }
         }
-      } 
-      else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
-        if(_leader != null) {
+      } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
+        if (_leader != null) {
           _leader.disconnect();
         }
         _controller.shutdownClusterStatusMonitor(manager.getClusterName());
@@ -106,62 +100,4 @@ public class DistClusterControllerElection implements ControllerChangeListener {
       LOG.error("Exception when trying to become leader", e);
     }
   }
-
-  private boolean tryUpdateController(HelixManager manager) {
-    // DataAccessor dataAccessor = manager.getDataAccessor();
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    LiveInstance leader = new LiveInstance(manager.getInstanceName());
-    try {
-      leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
-      // TODO: this session id is not the leader's session id in
-      // distributed mode
-      leader.setSessionId(manager.getSessionId());
-      leader.setHelixVersion(manager.getVersion());
-      if (ZKPropertyTransferServer.getInstance() != null) {
-        String zkPropertyTransferServiceUrl =
-            ZKPropertyTransferServer.getInstance().getWebserviceUrl();
-        if (zkPropertyTransferServiceUrl != null) {
-          leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
-        }
-      } else {
-        LOG.warn("ZKPropertyTransferServer instnace is null");
-      }
-      boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
-      if (success) {
-        return true;
-      } else {
-        LOG.info("Unable to become leader probably because some other controller becames the leader");
-      }
-    } catch (Exception e) {
-      LOG.error(
-          "Exception when trying to updating leader record in cluster:" + manager.getClusterName()
-              + ". Need to check again whether leader node has been created or not", e);
-    }
-
-    leader = accessor.getProperty(keyBuilder.controllerLeader());
-    if (leader != null) {
-      String leaderSessionId = leader.getTypedSessionId().stringify();
-      LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
-          + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
-
-      if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void updateHistory(HelixManager manager) {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
-    if (history == null) {
-      history = new LeaderHistory(PropertyType.HISTORY.toString());
-    }
-    history.updateHistory(manager.getClusterName(), manager.getInstanceName());
-    accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 6066859..d78bd9d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -725,6 +725,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   @Test()
   public void testSchedulerMsg4() throws Exception {
     _factory._results.clear();
+    Thread.sleep(2000);
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(

http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java
new file mode 100644
index 0000000..bf89cdb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java
@@ -0,0 +1,199 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.manager.zk.ZkHelixLeaderElection;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Ensure that the external view is able to update properly when participants share a connection.
+ */
+public class TestSharedConnection extends ZkUnitTestBase {
+  /**
+   * Ensure that the external view is able to update properly when participants share a connection.
+   */
+  @Test
+  public void testSharedParticipantConnection() throws Exception {
+    final int NUM_PARTICIPANTS = 2;
+    final int NUM_PARTITIONS = 4;
+    final int NUM_REPLICAS = 2;
+    final String RESOURCE_NAME = "TestDB0";
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "OnlineOffline", RebalanceMode.CUSTOMIZED, true); // do rebalance
+
+    // Connect
+    HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+    connection.connect();
+
+    // Start some participants
+    HelixParticipant[] participants = new HelixParticipant[NUM_PARTICIPANTS];
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      participants[i] =
+          connection.createParticipant(ClusterId.from(clusterName),
+              ParticipantId.from("localhost_" + (12918 + i)));
+      participants[i].getStateMachineEngine().registerStateModelFactory(
+          StateModelDefId.from("OnlineOffline"), new TestHelixConnection.MockStateModelFactory());
+      participants[i].startAsync();
+    }
+
+    // Start the controller
+    HelixController controller =
+        connection.createController(ClusterId.from(clusterName), ControllerId.from("controller"));
+    controller.startAsync();
+    Thread.sleep(500);
+
+    // Verify balanced cluster
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // Drop a partition from the first participant
+    HelixAdmin admin = connection.createClusterManagementTool();
+    IdealState idealState = admin.getResourceIdealState(clusterName, RESOURCE_NAME);
+    Map<ParticipantId, State> participantStateMap =
+        idealState.getParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_0"));
+    participantStateMap.remove(ParticipantId.from("localhost_12918"));
+    idealState.setParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_0"), participantStateMap);
+    admin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
+    Thread.sleep(1000);
+
+    // Verify balanced cluster
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // Drop a partition from the second participant
+    participantStateMap = idealState.getParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_1"));
+    participantStateMap.remove(ParticipantId.from("localhost_12919"));
+    idealState.setParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_1"), participantStateMap);
+    admin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
+    Thread.sleep(1000);
+
+    // Verify balanced cluster
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // Clean up
+    controller.stopAsync();
+    for (HelixParticipant participant : participants) {
+      participant.stopAsync();
+    }
+    admin.dropCluster(clusterName);
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * Ensure that only one controller with a shared connection thinks it's leader
+   */
+  @Test
+  public void testSharedControllerConnection() throws Exception {
+    final int NUM_PARTICIPANTS = 2;
+    final int NUM_PARTITIONS = 4;
+    final int NUM_REPLICAS = 2;
+    final int NUM_CONTROLLERS = 2;
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "OnlineOffline", RebalanceMode.CUSTOMIZED, true); // do rebalance
+
+    // Connect
+    HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+    connection.connect();
+
+    // Create a couple controllers
+    HelixController[] controllers = new HelixController[NUM_CONTROLLERS];
+    for (int i = 0; i < NUM_CONTROLLERS; i++) {
+      controllers[i] =
+          connection.createController(ClusterId.from(clusterName),
+              ControllerId.from("controller_" + i));
+      controllers[i].startAsync();
+    }
+    Thread.sleep(1000);
+
+    // Now verify that exactly one is leader
+    int leaderCount = 0;
+    for (HelixController controller : controllers) {
+      HelixConnectionAdaptor adaptor = new HelixConnectionAdaptor(controller);
+      boolean result = ZkHelixLeaderElection.tryUpdateController(adaptor);
+      if (result) {
+        leaderCount++;
+      }
+    }
+    Assert.assertEquals(leaderCount, 1);
+
+    // Clean up
+    for (HelixController controller : controllers) {
+      controller.stopAsync();
+    }
+    HelixAdmin admin = connection.createClusterManagementTool();
+    admin.dropCluster(clusterName);
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}


[2/2] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/helix

Posted by ka...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/helix


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f1ffa861
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f1ffa861
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f1ffa861

Branch: refs/heads/master
Commit: f1ffa8619ef21b7521d73b22348399ef79d90cca
Parents: 5cf39b9 6952c8f
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Jan 24 17:29:38 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jan 24 17:29:38 2014 -0800

----------------------------------------------------------------------
 .../src/site/markdown/Tutorial.md               |   1 +
 .../src/site/markdown/tutorial_agent.md         | 169 +++++++++++++++++++
 .../src/site/markdown/Tutorial.md               |   1 +
 .../src/site/markdown/tutorial_agent.md         | 169 +++++++++++++++++++
 .../src/site/markdown/Tutorial.md               |   1 +
 .../src/site/markdown/tutorial_agent.md         | 169 +++++++++++++++++++
 website/trunk/src/site/markdown/Tutorial.md     |   1 +
 .../trunk/src/site/markdown/tutorial_agent.md   | 169 +++++++++++++++++++
 8 files changed, 680 insertions(+)
----------------------------------------------------------------------