You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/01/25 21:48:56 UTC
[05/50] [abbrv] helix git commit: Add P2P
(Participant-to-Participant) state-transition message support in Helix
controller.
http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
new file mode 100644
index 0000000..1a21ef9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -0,0 +1,176 @@
+package org.apache.helix.messaging.p2pMessage;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BaseStageTest;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageSelectionStageOutput;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestP2PStateTransitionMessages extends BaseStageTest {
+ String db = "testDB";
+ int numPartition = 1;
+ int numReplica = 3;
+
+
+ private void preSetup() {
+ setupIdealState(3, new String[]{db}, numPartition, numReplica, IdealState.RebalanceMode.SEMI_AUTO,
+ BuiltInStateModelDefinitions.MasterSlave.name());
+ setupStateModel();
+ setupInstances(3);
+ setupLiveInstances(3);
+ }
+
+ @Test
+ public void testP2PMessageEnabled() throws Exception {
+ preSetup();
+ ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
+ clusterConfig.enableP2PMessage(true);
+ setClusterConfig(clusterConfig);
+
+ testP2PMessage(clusterConfig, true);
+ }
+
+ @Test
+ public void testP2PMessageDisabled() throws Exception {
+ preSetup();
+ testP2PMessage(null, false);
+ }
+
+ private void testP2PMessage(ClusterConfig clusterConfig, Boolean p2pMessageEnabled) throws Exception {
+ Map<String, Resource> resourceMap =
+ getResourceMap(new String[]{db}, numPartition, BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig,
+ null);
+
+ event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput());
+ event.addAttribute(AttributeName.helixmanager.name(), manager);
+
+ Pipeline pipeline = createPipeline();
+ pipeline.handle(event);
+
+ BestPossibleStateOutput bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+
+ CurrentStateOutput currentStateOutput = populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ Partition p = new Partition(db + "_0");
+
+ String masterInstance =
+ getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p), MasterSlaveSMD.States.MASTER.name());
+ Assert.assertNotNull(masterInstance);
+
+ admin.enableInstance(_clusterName, masterInstance, false);
+ ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+ cache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
+
+ pipeline.handle(event);
+
+ bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+
+ MessageSelectionStageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ List<Message> messages = messageOutput.getMessages(db, p);
+
+ Assert.assertEquals(messages.size(), 1);
+ Message message = messages.get(0);
+ Assert.assertEquals(message.getTgtName(), masterInstance);
+ Assert.assertEquals(message.getFromState(), MasterSlaveSMD.States.MASTER.name());
+ Assert.assertEquals(message.getToState(), MasterSlaveSMD.States.SLAVE.name());
+
+ if (p2pMessageEnabled) {
+ Assert.assertEquals(message.getRelayMessages().entrySet().size(), 1);
+ String newMasterInstance =
+ getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p), MasterSlaveSMD.States.MASTER.name());
+
+ Message relayMessage = message.getRelayMessage(newMasterInstance);
+ Assert.assertNotNull(relayMessage);
+ Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
+ Assert.assertEquals(relayMessage.getTgtName(), newMasterInstance);
+ Assert.assertEquals(relayMessage.getRelaySrcHost(), masterInstance);
+ Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+ Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
+ } else {
+ Assert.assertTrue(message.getRelayMessages().entrySet().isEmpty());
+ }
+ }
+
+ private String getTopStateInstance(Map<String, String> instanceStateMap, String topState) {
+ String masterInstance = null;
+ for (Map.Entry<String, String> e : instanceStateMap.entrySet()) {
+ if (topState.equals(e.getValue())) {
+ masterInstance = e.getKey();
+ }
+ }
+
+ return masterInstance;
+ }
+
+ private CurrentStateOutput populateCurrentStateFromBestPossible(BestPossibleStateOutput bestPossibleStateOutput) {
+ CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ for (String resource : bestPossibleStateOutput.getResourceStatesMap().keySet()) {
+ PartitionStateMap partitionStateMap = bestPossibleStateOutput.getPartitionStateMap(resource);
+ for (Partition p : partitionStateMap.partitionSet()) {
+ Map<String, String> stateMap = partitionStateMap.getPartitionMap(p);
+
+ for (Map.Entry<String, String> e : stateMap.entrySet()) {
+ currentStateOutput.setCurrentState(resource, p, e.getKey(), e.getValue());
+ }
+ }
+ }
+ return currentStateOutput;
+ }
+
+ private Pipeline createPipeline() {
+ Pipeline pipeline = new Pipeline("test");
+ pipeline.addStage(new ReadClusterDataStage());
+ pipeline.addStage(new BestPossibleStateCalcStage());
+ pipeline.addStage(new IntermediateStateCalcStage());
+ pipeline.addStage(new MessageGenerationPhase());
+ pipeline.addStage(new MessageSelectionStage());
+ pipeline.addStage(new MessageThrottleStage());
+
+ return pipeline;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
index f84565b..77da401 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
@@ -150,7 +150,7 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
@Override
public List<ZNRecord> get(List<String> paths, List<Stat> stats, int options) {
- List<ZNRecord> records = new ArrayList<ZNRecord>();
+ List<ZNRecord> records = new ArrayList<>();
for (int i = 0; i < paths.size(); i++) {
ZNRecord record = get(paths.get(i), stats.get(i), options);
records.add(record);
@@ -160,7 +160,7 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
@Override
public List<ZNRecord> getChildren(String parentPath, List<Stat> stats, int options) {
- List<ZNRecord> children = new ArrayList<ZNRecord>();
+ List<ZNRecord> children = new ArrayList<>();
for (String key : _recordMap.keySet()) {
if (key.startsWith(parentPath)) {
String[] keySplit = key.split("\\/");
@@ -182,7 +182,7 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
@Override
public List<String> getChildNames(String parentPath, int options) {
- List<String> child = new ArrayList<String>();
+ List<String> child = new ArrayList<>();
for (String key : _recordMap.keySet()) {
if (key.startsWith(parentPath)) {
String[] keySplit = key.split("\\/");
http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 8679007..037d92b 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
@@ -215,8 +216,19 @@ public class MockHelixAdmin implements HelixAdmin {
}
- @Override public void enableInstance(String clusterName, String instanceName, boolean enabled) {
+ @Override
+ public void enableInstance(String clusterName, String instanceName, boolean enabled) {
+ String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName);
+ if (!_baseDataAccessor.exists(instanceConfigsPath, 0)) {
+ _baseDataAccessor.create(instanceConfigsPath, new ZNRecord(instanceName), 0);
+ }
+
+ String instanceConfigPath = instanceConfigsPath + "/" + instanceName;
+ ZNRecord record = (ZNRecord) _baseDataAccessor.get(instanceConfigPath, null, 0);
+ InstanceConfig instanceConfig = new InstanceConfig(record);
+ instanceConfig.setInstanceEnabled(enabled);
+ _baseDataAccessor.set(instanceConfigPath, instanceConfig.getRecord(), 0);
}
@Override public void enableInstance(String clusterName, List<String> instances,
http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
index 8c05626..143f3c0 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
@@ -28,6 +28,8 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -42,7 +44,8 @@ import java.util.Map;
public class TestDisableResourceMbean extends ZkUnitTestBase {
private MBeanServerConnection _mbeanServer = ManagementFactory.getPlatformMBeanServer();
- @Test public void testDisableResourceMonitoring() throws Exception {
+ @Test
+ public void testDisableResourceMonitoring() throws Exception {
final int NUM_PARTICIPANTS = 2;
String clusterName = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -83,7 +86,9 @@ public class TestDisableResourceMbean extends ZkUnitTestBase {
new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
- Thread.sleep(300);
+ HelixClusterVerifier clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient).build();
+ Assert.assertTrue(clusterVerifier.verify());
// Verify the bean was created for TestDB0, but not for TestDB1.
Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB0", clusterName)));
http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
index 6f1b083..51b048d 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
@@ -47,15 +47,13 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
public final static String TEST_RESOURCE = "TestResource";
public final static String PARTITION = "PARTITION";
-
public void preSetup() {
setupLiveInstances(3);
setupStateModel();
Resource resource = new Resource(TEST_RESOURCE);
resource.setStateModelDefRef("MasterSlave");
resource.addPartition(PARTITION);
- event.addAttribute(AttributeName.RESOURCES.name(),
- Collections.singletonMap(TEST_RESOURCE, resource));
+ event.addAttribute(AttributeName.RESOURCES.name(), Collections.singletonMap(TEST_RESOURCE, resource));
event.addAttribute(AttributeName.clusterStatusMonitor.name(), new ClusterStatusMonitor("TestCluster"));
}