You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/04/10 08:38:24 UTC
[1/3] git commit: HELIX-425: Porting the fix to handle partition
level constraints from 0.6. to 0.7
Repository: helix
Updated Branches:
refs/heads/master 7937b8a27 -> cd1229e85
HELIX-425: Porting the fix to handle partition level constraints from 0.6. to 0.7
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1a2dd050
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1a2dd050
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1a2dd050
Branch: refs/heads/master
Commit: 1a2dd050ad16a9fdbf4989a6f66e0821f3f43ebf
Parents: 7937b8a
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Wed Apr 9 20:45:03 2014 -0700
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Wed Apr 9 20:45:03 2014 -0700
----------------------------------------------------------------------
.../apache/helix/api/config/ClusterConfig.java | 3 +
.../apache/helix/model/ClusterConstraints.java | 3 +
.../TestPartitionLevelTransitionConstraint.java | 266 +++++++++++++++++++
3 files changed, 272 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/1a2dd050/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
index ddc98fa..ef6b577 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -188,6 +188,9 @@ public class ClusterConfig {
case PARTICIPANT:
matchAttributes.put(ConstraintAttribute.INSTANCE, scope.getScopedId().stringify());
break;
+ case PARTITION:
+ matchAttributes.put(ConstraintAttribute.PARTITION, scope.getScopedId().stringify());
+ break;
default:
LOG.error("Unsupported scope for transition constraints: " + scope);
return Integer.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/helix/blob/1a2dd050/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
index daefe6e..a76c514 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
@@ -217,6 +217,9 @@ public class ClusterConstraints extends HelixProperty {
Transition.from(msg.getTypedFromState(), msg.getTypedToState()).toString());
}
if (msg.getResourceId() != null) {
+ attributes.put(ConstraintAttribute.PARTITION, msg.getPartitionId().stringify());
+ }
+ if (msg.getResourceId() != null) {
attributes.put(ConstraintAttribute.RESOURCE, msg.getResourceId().stringify());
}
if (msg.getTgtName() != null) {
http://git-wip-us.apache.org/repos/asf/helix/blob/1a2dd050/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
new file mode 100644
index 0000000..a1b64a8
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
@@ -0,0 +1,266 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.builder.ConstraintItemBuilder;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestPartitionLevelTransitionConstraint extends
+ ZkIntegrationTestBase {
+
+ private static Logger LOG = Logger
+ .getLogger(TestPartitionLevelTransitionConstraint.class);
+
+ final Queue<Message> _msgOrderList = new ConcurrentLinkedQueue<Message>();
+
+ public class BootstrapStateModel extends StateModel {
+ public void onBecomeBootstrapFromOffline(Message message,
+ NotificationContext context) {
+ LOG.info("Become Bootstrap from Offline");
+ _msgOrderList.add(message);
+ }
+
+ public void onBecomeOfflineFromBootstrap(Message message,
+ NotificationContext context) {
+ LOG.info("Become Offline from Bootstrap");
+ _msgOrderList.add(message);
+ }
+
+ public void onBecomeSlaveFromBootstrap(Message message,
+ NotificationContext context) {
+ LOG.info("Become Slave from Bootstrap");
+ _msgOrderList.add(message);
+ }
+
+ public void onBecomeBootstrapFromSlave(Message message,
+ NotificationContext context) {
+ LOG.info("Become Bootstrap from Slave");
+ _msgOrderList.add(message);
+ }
+
+ public void onBecomeMasterFromSlave(Message message,
+ NotificationContext context) {
+ LOG.info("Become Master from Slave");
+ _msgOrderList.add(message);
+ }
+
+ public void onBecomeSlaveFromMaster(Message message,
+ NotificationContext context) {
+ LOG.info("Become Slave from Master");
+ _msgOrderList.add(message);
+ }
+
+ }
+
+ public class BootstrapStateModelFactory extends
+ StateModelFactory<BootstrapStateModel> {
+
+ @Override
+ public BootstrapStateModel createNewStateModel(String stateUnitKey) {
+ BootstrapStateModel model = new BootstrapStateModel();
+ return model;
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 1, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave", false); // do not rebalance
+
+ // setup semi-auto ideal-state
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(
+ _gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
+ baseAccessor);
+ StateModelDefinition stateModelDef = defineStateModel();
+ accessor.setProperty(accessor.keyBuilder().stateModelDef("Bootstrap"),
+ stateModelDef);
+ IdealState idealState = accessor.getProperty(accessor.keyBuilder()
+ .idealStates("TestDB0"));
+ idealState.setStateModelDefRef("Bootstrap");
+ idealState.setReplicas("2");
+ idealState.getRecord().setListField("TestDB0_0",
+ Arrays.asList("localhost_12919", "localhost_12918"));
+ accessor.setProperty(accessor.keyBuilder().idealStates("TestDB0"),
+ idealState);
+
+ // setup partition-level constraint
+ ConstraintItemBuilder constraintItemBuilder = new ConstraintItemBuilder();
+
+ constraintItemBuilder
+ .addConstraintAttribute(
+ ConstraintAttribute.MESSAGE_TYPE.toString(),
+ "STATE_TRANSITION")
+ .addConstraintAttribute(
+ ConstraintAttribute.PARTITION.toString(), ".*")
+ .addConstraintAttribute(
+ ConstraintAttribute.CONSTRAINT_VALUE.toString(), "1");
+
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.setConstraint(clusterName, ConstraintType.MESSAGE_CONSTRAINT,
+ "constraint1", constraintItemBuilder.build());
+
+ ClusterControllerManager controller = new ClusterControllerManager(
+ ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start 1st participant
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ String instanceName1 = "localhost_12918";
+
+ participants[0] = new MockParticipantManager(ZK_ADDR, clusterName,
+ instanceName1);
+ participants[0].getStateMachineEngine().registerStateModelFactory(
+ "Bootstrap", new BootstrapStateModelFactory());
+ participants[0].syncStart();
+
+ boolean result = ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // start 2nd participant which will be the master for Test0_0
+ String instanceName2 = "localhost_12919";
+ participants[1] = new MockParticipantManager(ZK_ADDR, clusterName,
+ instanceName2);
+ participants[1].getStateMachineEngine().registerStateModelFactory(
+ "Bootstrap", new BootstrapStateModelFactory());
+ participants[1].syncStart();
+
+ result = ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // check we received the message in the right order
+ Assert.assertEquals(_msgOrderList.size(), 7);
+
+ Message[] _msgOrderArray = _msgOrderList.toArray(new Message[0]);
+ assertMessage(_msgOrderArray[0], "OFFLINE", "BOOTSTRAP", instanceName1);
+ assertMessage(_msgOrderArray[1], "BOOTSTRAP", "SLAVE", instanceName1);
+ assertMessage(_msgOrderArray[2], "SLAVE", "MASTER", instanceName1);
+
+ // after we start the 2nd instance, the messages should be received in
+ // the following order:
+ // 1) offline->bootstrap for localhost_12919
+ // 2) bootstrap->slave for localhost_12919
+ // 3) master->slave for localhost_12918
+ // 4) slave->master for localhost_12919
+ assertMessage(_msgOrderArray[3], "OFFLINE", "BOOTSTRAP", instanceName2);
+ assertMessage(_msgOrderArray[4], "BOOTSTRAP", "SLAVE", instanceName2);
+ assertMessage(_msgOrderArray[5], "MASTER", "SLAVE", instanceName1);
+ assertMessage(_msgOrderArray[6], "SLAVE", "MASTER", instanceName2);
+
+ // clean up
+ // wait for all zk callbacks done
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ private static void assertMessage(Message msg, String fromState,
+ String toState, String instance) {
+ Assert.assertEquals(msg.getFromState(), fromState);
+ Assert.assertEquals(msg.getToState(), toState);
+ Assert.assertEquals(msg.getTgtName(), instance);
+ }
+
+ private static StateModelDefinition defineStateModel() {
+ StateModelDefinition.Builder builder = new StateModelDefinition.Builder(
+ "Bootstrap");
+ // Add states and their rank to indicate priority. Lower the rank higher
+ // the priority
+ builder.addState("MASTER", 1);
+ builder.addState("SLAVE", 2);
+ builder.addState("BOOTSTRAP", 3);
+ builder.addState("OFFLINE");
+ builder.addState("DROPPED");
+ // Set the initial state when the node starts
+ builder.initialState("OFFLINE");
+
+ // Add transitions between the states.
+ builder.addTransition("OFFLINE", "BOOTSTRAP", 3);
+ builder.addTransition("BOOTSTRAP", "SLAVE", 2);
+ builder.addTransition("SLAVE", "MASTER", 1);
+ builder.addTransition("MASTER", "SLAVE", 4);
+ builder.addTransition("SLAVE", "OFFLINE", 5);
+ builder.addTransition("OFFLINE", "DROPPED", 6);
+
+ // set constraints on states.
+ // static constraint
+ builder.upperBound("MASTER", 1);
+ // dynamic constraint, R means it should be derived based on the
+ // replication factor.
+ builder.dynamicUpperBound("SLAVE", "R");
+
+ StateModelDefinition statemodelDefinition = builder.build();
+
+ assert (statemodelDefinition.isValid());
+
+ return statemodelDefinition;
+ }
+}
[3/3] git commit: [HELIX-425] Fixing formatting issues with the
previous commit
Posted by ki...@apache.org.
[HELIX-425] Fixing formatting issues with the previous commit
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/cd1229e8
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/cd1229e8
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/cd1229e8
Branch: refs/heads/master
Commit: cd1229e85d9241abd6746888d8acb9b4532fe1a5
Parents: 69db435
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Wed Apr 9 23:17:02 2014 -0700
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Wed Apr 9 23:17:02 2014 -0700
----------------------------------------------------------------------
.../apache/helix/api/config/ClusterConfig.java | 4 +-
.../apache/helix/model/ClusterConstraints.java | 2 +-
.../TestPartitionLevelTransitionConstraint.java | 405 +++++++++----------
3 files changed, 194 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/cd1229e8/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
index ef6b577..d5dd337 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -189,8 +189,8 @@ public class ClusterConfig {
matchAttributes.put(ConstraintAttribute.INSTANCE, scope.getScopedId().stringify());
break;
case PARTITION:
- matchAttributes.put(ConstraintAttribute.PARTITION, scope.getScopedId().stringify());
- break;
+ matchAttributes.put(ConstraintAttribute.PARTITION, scope.getScopedId().stringify());
+ break;
default:
LOG.error("Unsupported scope for transition constraints: " + scope);
return Integer.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/helix/blob/cd1229e8/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
index a76c514..7536ca5 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
@@ -217,7 +217,7 @@ public class ClusterConstraints extends HelixProperty {
Transition.from(msg.getTypedFromState(), msg.getTypedToState()).toString());
}
if (msg.getResourceId() != null) {
- attributes.put(ConstraintAttribute.PARTITION, msg.getPartitionId().stringify());
+ attributes.put(ConstraintAttribute.PARTITION, msg.getPartitionId().stringify());
}
if (msg.getResourceId() != null) {
attributes.put(ConstraintAttribute.RESOURCE, msg.getResourceId().stringify());
http://git-wip-us.apache.org/repos/asf/helix/blob/cd1229e8/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
index be1ab99..b33ab23 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
@@ -1,6 +1,5 @@
package org.apache.helix.integration;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -49,217 +48,195 @@ import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
-public class TestPartitionLevelTransitionConstraint extends
- ZkIntegrationTestBase {
-
- private static Logger LOG = Logger
- .getLogger(TestPartitionLevelTransitionConstraint.class);
-
- final Queue<Message> _msgOrderList = new ConcurrentLinkedQueue<Message>();
-
- public class BootstrapStateModel extends StateModel {
- public void onBecomeBootstrapFromOffline(Message message,
- NotificationContext context) {
- LOG.info("Become Bootstrap from Offline");
- _msgOrderList.add(message);
- }
-
- public void onBecomeOfflineFromBootstrap(Message message,
- NotificationContext context) {
- LOG.info("Become Offline from Bootstrap");
- _msgOrderList.add(message);
- }
-
- public void onBecomeSlaveFromBootstrap(Message message,
- NotificationContext context) {
- LOG.info("Become Slave from Bootstrap");
- _msgOrderList.add(message);
- }
-
- public void onBecomeBootstrapFromSlave(Message message,
- NotificationContext context) {
- LOG.info("Become Bootstrap from Slave");
- _msgOrderList.add(message);
- }
-
- public void onBecomeMasterFromSlave(Message message,
- NotificationContext context) {
- LOG.info("Become Master from Slave");
- _msgOrderList.add(message);
- }
-
- public void onBecomeSlaveFromMaster(Message message,
- NotificationContext context) {
- LOG.info("Become Slave from Master");
- _msgOrderList.add(message);
- }
-
- }
-
- public class BootstrapStateModelFactory extends
- StateModelFactory<BootstrapStateModel> {
-
- @Override
- public BootstrapStateModel createNewStateModel(String stateUnitKey) {
- BootstrapStateModel model = new BootstrapStateModel();
- return model;
- }
- }
-
- @Test
- public void test() throws Exception {
- // Logger.getRootLogger().setLevel(Level.INFO);
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- int n = 2;
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 1, // partitions per resource
- n, // number of nodes
- 2, // replicas
- "MasterSlave", false); // do not rebalance
-
- // setup semi-auto ideal-state
- BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(
- _gZkClient);
- HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
- baseAccessor);
- StateModelDefinition stateModelDef = defineStateModel();
- accessor.setProperty(accessor.keyBuilder().stateModelDef("Bootstrap"),
- stateModelDef);
- IdealState idealState = accessor.getProperty(accessor.keyBuilder()
- .idealStates("TestDB0"));
- idealState.setStateModelDefRef("Bootstrap");
- idealState.setReplicas("2");
- idealState.getRecord().setListField("TestDB0_0",
- Arrays.asList("localhost_12919", "localhost_12918"));
- accessor.setProperty(accessor.keyBuilder().idealStates("TestDB0"),
- idealState);
-
- // setup partition-level constraint
- ConstraintItemBuilder constraintItemBuilder = new ConstraintItemBuilder();
-
- constraintItemBuilder
- .addConstraintAttribute(
- ConstraintAttribute.MESSAGE_TYPE.toString(),
- "STATE_TRANSITION")
- .addConstraintAttribute(
- ConstraintAttribute.PARTITION.toString(), ".*")
- .addConstraintAttribute(
- ConstraintAttribute.CONSTRAINT_VALUE.toString(), "1");
-
- HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
- admin.setConstraint(clusterName, ConstraintType.MESSAGE_CONSTRAINT,
- "constraint1", constraintItemBuilder.build());
-
- ClusterControllerManager controller = new ClusterControllerManager(
- ZK_ADDR, clusterName, "controller");
- controller.syncStart();
-
- // start 1st participant
- MockParticipantManager[] participants = new MockParticipantManager[n];
- String instanceName1 = "localhost_12918";
-
- participants[0] = new MockParticipantManager(ZK_ADDR, clusterName,
- instanceName1);
- participants[0].getStateMachineEngine().registerStateModelFactory(
- "Bootstrap", new BootstrapStateModelFactory());
- participants[0].syncStart();
-
- boolean result = ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
- ZK_ADDR, clusterName));
- Assert.assertTrue(result);
-
- // start 2nd participant which will be the master for Test0_0
- String instanceName2 = "localhost_12919";
- participants[1] = new MockParticipantManager(ZK_ADDR, clusterName,
- instanceName2);
- participants[1].getStateMachineEngine().registerStateModelFactory(
- "Bootstrap", new BootstrapStateModelFactory());
- participants[1].syncStart();
-
- result = ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
- ZK_ADDR, clusterName));
- Assert.assertTrue(result);
-
- // check we received the message in the right order
- Assert.assertEquals(_msgOrderList.size(), 7);
-
- Message[] _msgOrderArray = _msgOrderList.toArray(new Message[0]);
- assertMessage(_msgOrderArray[0], "OFFLINE", "BOOTSTRAP", instanceName1);
- assertMessage(_msgOrderArray[1], "BOOTSTRAP", "SLAVE", instanceName1);
- assertMessage(_msgOrderArray[2], "SLAVE", "MASTER", instanceName1);
-
- // after we start the 2nd instance, the messages should be received in
- // the following order:
- // 1) offline->bootstrap for localhost_12919
- // 2) bootstrap->slave for localhost_12919
- // 3) master->slave for localhost_12918
- // 4) slave->master for localhost_12919
- assertMessage(_msgOrderArray[3], "OFFLINE", "BOOTSTRAP", instanceName2);
- assertMessage(_msgOrderArray[4], "BOOTSTRAP", "SLAVE", instanceName2);
- assertMessage(_msgOrderArray[5], "MASTER", "SLAVE", instanceName1);
- assertMessage(_msgOrderArray[6], "SLAVE", "MASTER", instanceName2);
-
- // clean up
- // wait for all zk callbacks done
- controller.syncStop();
- for (int i = 0; i < n; i++) {
- participants[i].syncStop();
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
- }
-
- private static void assertMessage(Message msg, String fromState,
- String toState, String instance) {
- Assert.assertEquals(msg.getFromState(), fromState);
- Assert.assertEquals(msg.getToState(), toState);
- Assert.assertEquals(msg.getTgtName(), instance);
- }
-
- private static StateModelDefinition defineStateModel() {
- StateModelDefinition.Builder builder = new StateModelDefinition.Builder(
- "Bootstrap");
- // Add states and their rank to indicate priority. Lower the rank higher
- // the priority
- builder.addState("MASTER", 1);
- builder.addState("SLAVE", 2);
- builder.addState("BOOTSTRAP", 3);
- builder.addState("OFFLINE");
- builder.addState("DROPPED");
- // Set the initial state when the node starts
- builder.initialState("OFFLINE");
-
- // Add transitions between the states.
- builder.addTransition("OFFLINE", "BOOTSTRAP", 3);
- builder.addTransition("BOOTSTRAP", "SLAVE", 2);
- builder.addTransition("SLAVE", "MASTER", 1);
- builder.addTransition("MASTER", "SLAVE", 4);
- builder.addTransition("SLAVE", "OFFLINE", 5);
- builder.addTransition("OFFLINE", "DROPPED", 6);
-
- // set constraints on states.
- // static constraint
- builder.upperBound("MASTER", 1);
- // dynamic constraint, R means it should be derived based on the
- // replication factor.
- builder.dynamicUpperBound("SLAVE", "R");
-
- StateModelDefinition statemodelDefinition = builder.build();
-
- assert (statemodelDefinition.isValid());
-
- return statemodelDefinition;
- }
+public class TestPartitionLevelTransitionConstraint extends ZkIntegrationTestBase {
+
+ private static Logger LOG = Logger.getLogger(TestPartitionLevelTransitionConstraint.class);
+
+ final Queue<Message> _msgOrderList = new ConcurrentLinkedQueue<Message>();
+
+ public class BootstrapStateModel extends StateModel {
+ public void onBecomeBootstrapFromOffline(Message message, NotificationContext context) {
+ LOG.info("Become Bootstrap from Offline");
+ _msgOrderList.add(message);
+ }
+
+ public void onBecomeOfflineFromBootstrap(Message message, NotificationContext context) {
+ LOG.info("Become Offline from Bootstrap");
+ _msgOrderList.add(message);
+ }
+
+ public void onBecomeSlaveFromBootstrap(Message message, NotificationContext context) {
+ LOG.info("Become Slave from Bootstrap");
+ _msgOrderList.add(message);
+ }
+
+ public void onBecomeBootstrapFromSlave(Message message, NotificationContext context) {
+ LOG.info("Become Bootstrap from Slave");
+ _msgOrderList.add(message);
+ }
+
+ public void onBecomeMasterFromSlave(Message message, NotificationContext context) {
+ LOG.info("Become Master from Slave");
+ _msgOrderList.add(message);
+ }
+
+ public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
+ LOG.info("Become Slave from Master");
+ _msgOrderList.add(message);
+ }
+
+ }
+
+ public class BootstrapStateModelFactory extends StateModelFactory<BootstrapStateModel> {
+
+ @Override
+ public BootstrapStateModel createNewStateModel(String stateUnitKey) {
+ BootstrapStateModel model = new BootstrapStateModel();
+ return model;
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 1, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave", false); // do not rebalance
+
+ // setup semi-auto ideal-state
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ StateModelDefinition stateModelDef = defineStateModel();
+ accessor.setProperty(accessor.keyBuilder().stateModelDef("Bootstrap"), stateModelDef);
+ IdealState idealState = accessor.getProperty(accessor.keyBuilder().idealStates("TestDB0"));
+ idealState.setStateModelDefRef("Bootstrap");
+ idealState.setReplicas("2");
+ idealState.getRecord().setListField("TestDB0_0",
+ Arrays.asList("localhost_12919", "localhost_12918"));
+ accessor.setProperty(accessor.keyBuilder().idealStates("TestDB0"), idealState);
+
+ // setup partition-level constraint
+ ConstraintItemBuilder constraintItemBuilder = new ConstraintItemBuilder();
+
+ constraintItemBuilder
+ .addConstraintAttribute(ConstraintAttribute.MESSAGE_TYPE.toString(), "STATE_TRANSITION")
+ .addConstraintAttribute(ConstraintAttribute.PARTITION.toString(), ".*")
+ .addConstraintAttribute(ConstraintAttribute.CONSTRAINT_VALUE.toString(), "1");
+
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.setConstraint(clusterName, ConstraintType.MESSAGE_CONSTRAINT, "constraint1",
+ constraintItemBuilder.build());
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start 1st participant
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ String instanceName1 = "localhost_12918";
+
+ participants[0] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName1);
+ participants[0].getStateMachineEngine().registerStateModelFactory("Bootstrap",
+ new BootstrapStateModelFactory());
+ participants[0].syncStart();
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // start 2nd participant which will be the master for Test0_0
+ String instanceName2 = "localhost_12919";
+ participants[1] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName2);
+ participants[1].getStateMachineEngine().registerStateModelFactory("Bootstrap",
+ new BootstrapStateModelFactory());
+ participants[1].syncStart();
+
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // check we received the message in the right order
+ Assert.assertEquals(_msgOrderList.size(), 7);
+
+ Message[] _msgOrderArray = _msgOrderList.toArray(new Message[0]);
+ assertMessage(_msgOrderArray[0], "OFFLINE", "BOOTSTRAP", instanceName1);
+ assertMessage(_msgOrderArray[1], "BOOTSTRAP", "SLAVE", instanceName1);
+ assertMessage(_msgOrderArray[2], "SLAVE", "MASTER", instanceName1);
+
+ // after we start the 2nd instance, the messages should be received in
+ // the following order:
+ // 1) offline->bootstrap for localhost_12919
+ // 2) bootstrap->slave for localhost_12919
+ // 3) master->slave for localhost_12918
+ // 4) slave->master for localhost_12919
+ assertMessage(_msgOrderArray[3], "OFFLINE", "BOOTSTRAP", instanceName2);
+ assertMessage(_msgOrderArray[4], "BOOTSTRAP", "SLAVE", instanceName2);
+ assertMessage(_msgOrderArray[5], "MASTER", "SLAVE", instanceName1);
+ assertMessage(_msgOrderArray[6], "SLAVE", "MASTER", instanceName2);
+
+ // clean up
+ // wait for all zk callbacks done
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ private static void assertMessage(Message msg, String fromState, String toState, String instance) {
+ Assert.assertEquals(msg.getFromState(), fromState);
+ Assert.assertEquals(msg.getToState(), toState);
+ Assert.assertEquals(msg.getTgtName(), instance);
+ }
+
+ private static StateModelDefinition defineStateModel() {
+ StateModelDefinition.Builder builder = new StateModelDefinition.Builder("Bootstrap");
+ // Add states and their rank to indicate priority. Lower the rank higher
+ // the priority
+ builder.addState("MASTER", 1);
+ builder.addState("SLAVE", 2);
+ builder.addState("BOOTSTRAP", 3);
+ builder.addState("OFFLINE");
+ builder.addState("DROPPED");
+ // Set the initial state when the node starts
+ builder.initialState("OFFLINE");
+
+ // Add transitions between the states.
+ builder.addTransition("OFFLINE", "BOOTSTRAP", 3);
+ builder.addTransition("BOOTSTRAP", "SLAVE", 2);
+ builder.addTransition("SLAVE", "MASTER", 1);
+ builder.addTransition("MASTER", "SLAVE", 4);
+ builder.addTransition("SLAVE", "OFFLINE", 5);
+ builder.addTransition("OFFLINE", "DROPPED", 6);
+
+ // set constraints on states.
+ // static constraint
+ builder.upperBound("MASTER", 1);
+ // dynamic constraint, R means it should be derived based on the
+ // replication factor.
+ builder.dynamicUpperBound("SLAVE", "R");
+
+ StateModelDefinition statemodelDefinition = builder.build();
+
+ assert (statemodelDefinition.isValid());
+
+ return statemodelDefinition;
+ }
}
[2/3] git commit: [HELIX-425]: Porting the fix to handle partition
level constraints from 0.6. to 0.7
Posted by ki...@apache.org.
[HELIX-425]: Porting the fix to handle partition level constraints from 0.6. to 0.7
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/69db4357
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/69db4357
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/69db4357
Branch: refs/heads/master
Commit: 69db435724d7d3f6b2e6b208eadad8d6823fc320
Parents: 1a2dd05
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Wed Apr 9 20:46:44 2014 -0700
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Wed Apr 9 20:49:56 2014 -0700
----------------------------------------------------------------------
.../helix/integration/TestPartitionLevelTransitionConstraint.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/69db4357/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
index a1b64a8..be1ab99 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
@@ -1,5 +1,6 @@
package org.apache.helix.integration;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,10 +20,8 @@ package org.apache.helix.integration;
* under the License.
*/
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
-import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;