You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/01/25 21:48:56 UTC

[05/50] [abbrv] helix git commit: Add P2P (Participant-to-Participant) state-transition message support in Helix controller.

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
new file mode 100644
index 0000000..1a21ef9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -0,0 +1,176 @@
+package org.apache.helix.messaging.p2pMessage;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BaseStageTest;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageSelectionStageOutput;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestP2PStateTransitionMessages extends BaseStageTest {
+  String db = "testDB";
+  int numPartition = 1;
+  int numReplica = 3;
+
+
+  private void preSetup() {
+    setupIdealState(3, new String[]{db}, numPartition, numReplica, IdealState.RebalanceMode.SEMI_AUTO,
+        BuiltInStateModelDefinitions.MasterSlave.name());
+    setupStateModel();
+    setupInstances(3);
+    setupLiveInstances(3);
+  }
+
+  @Test
+  public void testP2PMessageEnabled() throws Exception {
+    preSetup();
+    ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
+    clusterConfig.enableP2PMessage(true);
+    setClusterConfig(clusterConfig);
+
+    testP2PMessage(clusterConfig, true);
+  }
+
+  @Test
+  public void testP2PMessageDisabled() throws Exception {
+    preSetup();
+    testP2PMessage(null, false);
+  }
+
+  private void testP2PMessage(ClusterConfig clusterConfig, Boolean p2pMessageEnabled) throws Exception {
+    Map<String, Resource> resourceMap =
+        getResourceMap(new String[]{db}, numPartition, BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig,
+            null);
+
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput());
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+
+    Pipeline pipeline = createPipeline();
+    pipeline.handle(event);
+
+    BestPossibleStateOutput bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+
+    CurrentStateOutput currentStateOutput = populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    Partition p = new Partition(db + "_0");
+
+    String masterInstance =
+        getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p), MasterSlaveSMD.States.MASTER.name());
+    Assert.assertNotNull(masterInstance);
+
+    admin.enableInstance(_clusterName, masterInstance, false);
+    ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+    cache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
+
+    pipeline.handle(event);
+
+    bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+
+    MessageSelectionStageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    List<Message> messages = messageOutput.getMessages(db, p);
+
+    Assert.assertEquals(messages.size(), 1);
+    Message message = messages.get(0);
+    Assert.assertEquals(message.getTgtName(), masterInstance);
+    Assert.assertEquals(message.getFromState(), MasterSlaveSMD.States.MASTER.name());
+    Assert.assertEquals(message.getToState(), MasterSlaveSMD.States.SLAVE.name());
+
+    if (p2pMessageEnabled) {
+      Assert.assertEquals(message.getRelayMessages().entrySet().size(), 1);
+      String newMasterInstance =
+          getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p), MasterSlaveSMD.States.MASTER.name());
+
+      Message relayMessage = message.getRelayMessage(newMasterInstance);
+      Assert.assertNotNull(relayMessage);
+      Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
+      Assert.assertEquals(relayMessage.getTgtName(), newMasterInstance);
+      Assert.assertEquals(relayMessage.getRelaySrcHost(), masterInstance);
+      Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+      Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
+    } else {
+      Assert.assertTrue(message.getRelayMessages().entrySet().isEmpty());
+    }
+  }
+
+  private String getTopStateInstance(Map<String, String> instanceStateMap, String topState) {
+    String masterInstance = null;
+    for (Map.Entry<String, String> e : instanceStateMap.entrySet()) {
+      if (topState.equals(e.getValue())) {
+        masterInstance = e.getKey();
+      }
+    }
+
+    return masterInstance;
+  }
+
+  private CurrentStateOutput populateCurrentStateFromBestPossible(BestPossibleStateOutput bestPossibleStateOutput) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    for (String resource : bestPossibleStateOutput.getResourceStatesMap().keySet()) {
+      PartitionStateMap partitionStateMap = bestPossibleStateOutput.getPartitionStateMap(resource);
+      for (Partition p : partitionStateMap.partitionSet()) {
+        Map<String, String> stateMap = partitionStateMap.getPartitionMap(p);
+
+        for (Map.Entry<String, String> e : stateMap.entrySet()) {
+          currentStateOutput.setCurrentState(resource, p, e.getKey(), e.getValue());
+        }
+      }
+    }
+    return currentStateOutput;
+  }
+
+  private Pipeline createPipeline() {
+    Pipeline pipeline = new Pipeline("test");
+    pipeline.addStage(new ReadClusterDataStage());
+    pipeline.addStage(new BestPossibleStateCalcStage());
+    pipeline.addStage(new IntermediateStateCalcStage());
+    pipeline.addStage(new MessageGenerationPhase());
+    pipeline.addStage(new MessageSelectionStage());
+    pipeline.addStage(new MessageThrottleStage());
+
+    return pipeline;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
index f84565b..77da401 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
@@ -150,7 +150,7 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
 
   @Override
   public List<ZNRecord> get(List<String> paths, List<Stat> stats, int options) {
-    List<ZNRecord> records = new ArrayList<ZNRecord>();
+    List<ZNRecord> records = new ArrayList<>();
     for (int i = 0; i < paths.size(); i++) {
       ZNRecord record = get(paths.get(i), stats.get(i), options);
       records.add(record);
@@ -160,7 +160,7 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
 
   @Override
   public List<ZNRecord> getChildren(String parentPath, List<Stat> stats, int options) {
-    List<ZNRecord> children = new ArrayList<ZNRecord>();
+    List<ZNRecord> children = new ArrayList<>();
     for (String key : _recordMap.keySet()) {
       if (key.startsWith(parentPath)) {
         String[] keySplit = key.split("\\/");
@@ -182,7 +182,7 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
 
   @Override
   public List<String> getChildNames(String parentPath, int options) {
-    List<String> child = new ArrayList<String>();
+    List<String> child = new ArrayList<>();
     for (String key : _recordMap.keySet()) {
       if (key.startsWith(parentPath)) {
         String[] keySplit = key.split("\\/");

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 8679007..037d92b 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -215,8 +216,19 @@ public class MockHelixAdmin implements HelixAdmin {
 
   }
 
-  @Override public void enableInstance(String clusterName, String instanceName, boolean enabled) {
+  @Override
+  public void enableInstance(String clusterName, String instanceName, boolean enabled) {
+    String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName);
+    if (!_baseDataAccessor.exists(instanceConfigsPath, 0)) {
+      _baseDataAccessor.create(instanceConfigsPath, new ZNRecord(instanceName), 0);
+    }
+
+    String instanceConfigPath = instanceConfigsPath + "/" + instanceName;
 
+    ZNRecord  record = (ZNRecord) _baseDataAccessor.get(instanceConfigPath, null, 0);
+    InstanceConfig instanceConfig = new InstanceConfig(record);
+    instanceConfig.setInstanceEnabled(enabled);
+    _baseDataAccessor.set(instanceConfigPath, instanceConfig.getRecord(), 0);
   }
 
   @Override public void enableInstance(String clusterName, List<String> instances,

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
index 8c05626..143f3c0 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
@@ -28,6 +28,8 @@ import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -42,7 +44,8 @@ import java.util.Map;
 public class TestDisableResourceMbean extends ZkUnitTestBase {
   private MBeanServerConnection _mbeanServer = ManagementFactory.getPlatformMBeanServer();
 
-  @Test public void testDisableResourceMonitoring() throws Exception {
+  @Test
+  public void testDisableResourceMonitoring() throws Exception {
     final int NUM_PARTICIPANTS = 2;
     String clusterName = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -83,7 +86,9 @@ public class TestDisableResourceMbean extends ZkUnitTestBase {
         new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
-    Thread.sleep(300);
+    HelixClusterVerifier clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient).build();
+    Assert.assertTrue(clusterVerifier.verify());
 
     // Verify the bean was created for TestDB0, but not for TestDB1.
     Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB0", clusterName)));

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
index 6f1b083..51b048d 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
@@ -47,15 +47,13 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
   public final static String TEST_RESOURCE = "TestResource";
   public final static String PARTITION = "PARTITION";
 
-
   public void preSetup() {
     setupLiveInstances(3);
     setupStateModel();
     Resource resource = new Resource(TEST_RESOURCE);
     resource.setStateModelDefRef("MasterSlave");
     resource.addPartition(PARTITION);
-    event.addAttribute(AttributeName.RESOURCES.name(),
-        Collections.singletonMap(TEST_RESOURCE, resource));
+    event.addAttribute(AttributeName.RESOURCES.name(), Collections.singletonMap(TEST_RESOURCE, resource));
     event.addAttribute(AttributeName.clusterStatusMonitor.name(), new ClusterStatusMonitor("TestCluster"));
   }