You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/11/11 22:09:57 UTC
[03/10] [HELIX-279] Apply gc handling fixes to ZKHelixManager
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 8558b18..30f5807 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration;
* under the License.
*/
+import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashSet;
@@ -28,20 +29,19 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
import org.apache.helix.Criteria;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.SessionId;
+import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
@@ -52,8 +52,13 @@ import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StatusUpdate;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.testng.Assert;
@@ -70,6 +75,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
@Override
public void onTimeOut() {
// TODO Auto-generated method stub
+
}
@Override
@@ -78,20 +84,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
}
- TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
+ final TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
- int cnt;
public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
- public TestMessagingHandlerFactory() {
- super();
- cnt = 0;
- }
-
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
- // System.out.println("\t create-hdlr: " + message.getId());
return new TestMessagingHandler(message, context);
}
@@ -116,20 +115,73 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
public HelixTaskResult handleMessage() throws InterruptedException {
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
- // String tgtName = _message.getTgtName();
- String messageId = _message.getMessageId().stringify();
- String partitionId = _message.getPartitionId().stringify();
+ String destName = _message.getTgtName();
+ String partitionName = _message.getPartitionName();
+ result.getTaskResultMap().put("Message", _message.getMsgId());
+ synchronized (_results) {
+ if (!_results.containsKey(partitionName)) {
+ _results.put(partitionName, new HashSet<String>());
+ }
+ _results.get(partitionName).add(_message.getMsgId());
+ }
+ // System.err.println("handle msg: " + _message.getPartitionName() + ", from: "
+ // + _message.getFromState() + ", to: " + _message.getToState());
+ return result;
+ }
- result.getTaskResultMap().put("Message", messageId);
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
+ // TODO Auto-generated method stub
+ }
+ }
+ }
+
+ public static class TestMessagingHandlerFactoryLatch implements MessageHandlerFactory {
+ public volatile CountDownLatch _latch = new CountDownLatch(1);
+ public int _messageCount = 0;
+ public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
+
+ @Override
+ public synchronized MessageHandler createHandler(Message message, NotificationContext context) {
+ _messageCount++;
+ return new TestMessagingHandlerLatch(message, context);
+ }
+
+ public synchronized void signal() {
+ _latch.countDown();
+ _latch = new CountDownLatch(1);
+ }
+
+ @Override
+ public String getMessageType() {
+ return "TestMessagingHandlerLatch";
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+ }
+
+ public class TestMessagingHandlerLatch extends MessageHandler {
+ public TestMessagingHandlerLatch(Message message, NotificationContext context) {
+ super(message, context);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException {
+ _latch.await();
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ result.getTaskResultMap().put("Message", _message.getMsgId());
+ String destName = _message.getTgtName();
synchronized (_results) {
- if (!_results.containsKey(partitionId)) {
- _results.put(partitionId, new HashSet<String>());
+ if (!_results.containsKey(_message.getPartitionName())) {
+ _results.put(_message.getPartitionName(), new ConcurrentSkipListSet<String>());
}
- _results.get(partitionId).add(messageId);
}
- cnt++;
- // System.err.println(cnt + ": message " + messageId + ", tgtName: " + tgtName
- // + ", partitionId: " + partitionId);
+ _results.get(_message.getPartitionName()).add(destName);
+ // System.err.println("Message " + _message.getMsgId() + " executed");
return result;
}
@@ -141,28 +193,116 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test()
+ public void testSchedulerMsgUsingQueue() throws Exception {
+ Logger.getRootLogger().setLevel(Level.INFO);
+ _factory._results.clear();
+ HelixManager manager = null;
+ for (int i = 0; i < NODE_NR; i++) {
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
+ _factory.getMessageType(), _factory);
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
+ }
+
+ Message schedulerMessage =
+ new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
+ schedulerMessage.setTgtName("CONTROLLER");
+ // TODO: change it to "ADMIN" ?
+ schedulerMessage.setSrcName("CONTROLLER");
+ schedulerMessage.getRecord().setSimpleField(
+ DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
+ // Template for the individual message sent to each participant
+ Message msg = new Message(_factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+
+ // Criteria to send individual messages
+ Criteria cr = new Criteria();
+ cr.setInstanceName("localhost_%");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setResource("%");
+ cr.setPartition("%");
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ String crString = sw.toString();
+
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ helixDataAccessor.createProperty(keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
+ schedulerMessage);
+
+ for (int i = 0; i < 30; i++) {
+ Thread.sleep(2000);
+ if (_PARTITIONS == _factory._results.size()) {
+ break;
+ }
+ }
+
+ Assert.assertEquals(_PARTITIONS, _factory._results.size());
+ PropertyKey controllerTaskStatus =
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+ schedulerMessage.getMsgId());
+
+ int messageResultCount = 0;
+ for (int i = 0; i < 10; i++) {
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+ Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
+ .equals("" + (_PARTITIONS * 3)));
+ for (String key : statusUpdate.getMapFields().keySet()) {
+ if (key.startsWith("MessageResult ")) {
+ messageResultCount++;
+ }
+ }
+ if (messageResultCount == _PARTITIONS * 3) {
+ break;
+ } else {
+ Thread.sleep(2000);
+ }
+ }
+ Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+ int count = 0;
+ for (Set<String> val : _factory._results.values()) {
+ count += val.size();
+ }
+ Assert.assertEquals(count, _PARTITIONS * 3);
+
+ }
+
+ @Test()
public void testSchedulerMsg() throws Exception {
- // Logger.getRootLogger().setLevel(Level.INFO);
+ Logger.getRootLogger().setLevel(Level.INFO);
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- manager = _startCMResultMap.get(hostDest)._manager;
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
}
Message schedulerMessage =
- new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
- schedulerMessage.setTgtSessionId(SessionId.from("*"));
+ new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
schedulerMessage.setTgtName("CONTROLLER");
// TODO: change it to "ADMIN" ?
schedulerMessage.setSrcName("CONTROLLER");
// schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE,
// "TestSchedulerMsg");
// Template for the individual message sent to each participant
- Message msg = new Message(_factory.getMessageType(), MessageId.from("Template"));
- msg.setTgtSessionId(SessionId.from("*"));
+ Message msg = new Message(_factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
msg.setMsgState(MessageState.NEW);
// Criteria to send individual messages
@@ -188,8 +328,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
- helixDataAccessor.createProperty(
- keyBuilder.controllerMessage(schedulerMessage.getMessageId().stringify()), schedulerMessage);
+ helixDataAccessor.createProperty(keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
+ schedulerMessage);
for (int i = 0; i < 30; i++) {
Thread.sleep(2000);
@@ -200,8 +340,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
Assert.assertEquals(_PARTITIONS, _factory._results.size());
PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), schedulerMessage
- .getMessageId().stringify());
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+ schedulerMessage.getMsgId());
int messageResultCount = 0;
for (int i = 0; i < 10; i++) {
@@ -231,11 +371,11 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
String controllerStatusPath =
HelixUtil.getControllerPropertyPath(manager.getClusterName(),
PropertyType.STATUSUPDATES_CONTROLLER);
- List<String> subPaths = _zkClient.getChildren(controllerStatusPath);
+ List<String> subPaths = _gZkClient.getChildren(controllerStatusPath);
Assert.assertTrue(subPaths.size() > 0);
for (String subPath : subPaths) {
String nextPath = controllerStatusPath + "/" + subPath;
- List<String> subsubPaths = _zkClient.getChildren(nextPath);
+ List<String> subsubPaths = _gZkClient.getChildren(nextPath);
Assert.assertTrue(subsubPaths.size() > 0);
}
@@ -243,38 +383,38 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
HelixUtil.getInstancePropertyPath(manager.getClusterName(), "localhost_" + (START_PORT),
PropertyType.STATUSUPDATES);
- subPaths = _zkClient.getChildren(instanceStatusPath);
+ subPaths = _gZkClient.getChildren(instanceStatusPath);
Assert.assertTrue(subPaths.size() > 0);
for (String subPath : subPaths) {
String nextPath = instanceStatusPath + "/" + subPath;
- List<String> subsubPaths = _zkClient.getChildren(nextPath);
+ List<String> subsubPaths = _gZkClient.getChildren(nextPath);
Assert.assertTrue(subsubPaths.size() > 0);
for (String subsubPath : subsubPaths) {
String nextnextPath = nextPath + "/" + subsubPath;
- Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0);
+ Assert.assertTrue(_gZkClient.getChildren(nextnextPath).size() > 0);
}
}
Thread.sleep(3000);
- ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0);
+ ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _gZkClient, 0);
dumpTask.run();
- subPaths = _zkClient.getChildren(controllerStatusPath);
+ subPaths = _gZkClient.getChildren(controllerStatusPath);
Assert.assertTrue(subPaths.size() > 0);
for (String subPath : subPaths) {
String nextPath = controllerStatusPath + "/" + subPath;
- List<String> subsubPaths = _zkClient.getChildren(nextPath);
+ List<String> subsubPaths = _gZkClient.getChildren(nextPath);
Assert.assertTrue(subsubPaths.size() == 0);
}
- subPaths = _zkClient.getChildren(instanceStatusPath);
+ subPaths = _gZkClient.getChildren(instanceStatusPath);
Assert.assertTrue(subPaths.size() > 0);
for (String subPath : subPaths) {
String nextPath = instanceStatusPath + "/" + subPath;
- List<String> subsubPaths = _zkClient.getChildren(nextPath);
+ List<String> subsubPaths = _gZkClient.getChildren(nextPath);
Assert.assertTrue(subsubPaths.size() > 0);
for (String subsubPath : subsubPaths) {
String nextnextPath = nextPath + "/" + subsubPath;
- Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() == 0);
+ Assert.assertTrue(_gZkClient.getChildren(nextnextPath).size() == 0);
}
}
}
@@ -284,22 +424,22 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- manager = _startCMResultMap.get(hostDest)._manager;
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
}
Message schedulerMessage =
- new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
- schedulerMessage.setTgtSessionId(SessionId.from("*"));
+ new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
schedulerMessage.setTgtName("CONTROLLER");
// TODO: change it to "ADMIN" ?
schedulerMessage.setSrcName("CONTROLLER");
// Template for the individual message sent to each participant
- Message msg = new Message(_factory.getMessageType(), MessageId.from("Template"));
- msg.setTgtSessionId(SessionId.from("*"));
+ Message msg = new Message(_factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
msg.setMsgState(MessageState.NEW);
// Criteria to send individual messages
@@ -375,22 +515,22 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
- manager = _startCMResultMap.get(hostDest)._manager;
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
}
Message schedulerMessage =
- new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
- schedulerMessage.setTgtSessionId(SessionId.from("*"));
+ new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
schedulerMessage.setTgtName("CONTROLLER");
// TODO: change it to "ADMIN" ?
schedulerMessage.setSrcName("CONTROLLER");
// Template for the individual message sent to each participant
- Message msg = new Message(factory.getMessageType(), MessageId.from("Template"));
- msg.setTgtSessionId(SessionId.from("*"));
+ Message msg = new Message(factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
msg.setMsgState(MessageState.NEW);
// Criteria to send individual messages
@@ -416,21 +556,21 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
- PropertyKey controllerMessageKey =
- keyBuilder.controllerMessage(schedulerMessage.getMessageId().stringify());
+ PropertyKey controllerMessageKey = keyBuilder.controllerMessage(schedulerMessage.getMsgId());
helixDataAccessor.setProperty(controllerMessageKey, schedulerMessage);
Thread.sleep(3000);
Assert.assertEquals(0, factory._results.size());
-
- waitMessageUpdate("SentMessageCount", schedulerMessage.getMessageId().stringify(),
- helixDataAccessor);
PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), schedulerMessage
- .getMessageId().stringify());
- waitMessageUpdate("SentMessageCount", schedulerMessage.getMessageId().stringify(),
- helixDataAccessor);
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+ schedulerMessage.getMsgId());
+ for (int i = 0; i < 10; i++) {
+ StatusUpdate update = helixDataAccessor.getProperty(controllerTaskStatus);
+ if (update == null || update.getRecord().getMapField("SentMessageCount") == null) {
+ Thread.sleep(1000);
+ }
+ }
ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount").equals("0"));
int count = 0;
@@ -442,30 +582,28 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
@Test()
public void testSchedulerMsg3() throws Exception {
- final int avgReplicas = _PARTITIONS * 3 / 5;
-
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- _factory.getMessageType(), _factory);
- manager = _startCMResultMap.get(hostDest)._manager;
+ // _participants[i].getMessagingService().registerMessageHandlerFactory(
+ // _factory.getMessageType(), _factory);
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
}
Message schedulerMessage =
- new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
- schedulerMessage.setTgtSessionId(SessionId.from("*"));
+ new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
schedulerMessage.setTgtName("CONTROLLER");
// TODO: change it to "ADMIN" ?
schedulerMessage.setSrcName("CONTROLLER");
// Template for the individual message sent to each participant
- Message msg = new Message(_factory.getMessageType(), MessageId.from("Template"));
- msg.setTgtSessionId(SessionId.from("*"));
+ Message msg = new Message(_factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
msg.setMsgState(MessageState.NEW);
// Criteria to send individual messages
@@ -518,7 +656,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
sw = new StringWriter();
mapper.writeValue(sw, cr);
- schedulerMessage.setMessageId(MessageId.from(UUID.randomUUID().toString()));
+ schedulerMessage.setMsgId(UUID.randomUUID().toString());
crString = sw.toString();
schedulerMessage.getRecord().setSimpleField("Criteria", crString);
manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
@@ -526,35 +664,59 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
callback._message.getResultMap().get(
DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
- Thread.sleep(1000);
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
- waitMessageUpdate("Summary", msgId, helixDataAccessor);
+ for (int j = 0; j < 100; j++) {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus =
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+ if (statusUpdate.getMapFields().containsKey("Summary")) {
+ break;
+ }
+ }
PropertyKey controllerTaskStatus =
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
- .equals("" + avgReplicas));
+ .equals("" + (_PARTITIONS * 3 / 5)));
int messageResultCount = 0;
for (String key : statusUpdate.getMapFields().keySet()) {
if (key.startsWith("MessageResult")) {
messageResultCount++;
}
}
- Assert.assertEquals(messageResultCount, avgReplicas);
+ Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
- int count = 0;
- // System.out.println(i);
- for (Set<String> val : _factory._results.values()) {
- // System.out.println(val);
- count += val.size();
+
+ boolean success = false;
+ for (int j = 0; j < 6; j++) {
+ int count = 0;
+ // System.out.println(i);
+ for (Set<String> val : _factory._results.values()) {
+ // System.out.println(val);
+ count += val.size();
+ }
+ // System.out.println(count);
+ // Assert.assertEquals(count, _PARTITIONS * 3 / 5 * (i + 1));
+ success = count == _PARTITIONS * 3 / 5 * (i + 1);
+ if (success) {
+ break;
+ }
+ Thread.sleep(500);
}
- // System.out.println(count);
+ Assert.assertTrue(success);
+ }
+ }
- Assert.assertEquals(count, avgReplicas * (i + 1));
+ private int count(TestMessagingHandlerFactory factory) {
+ int cnt = 0;
+ for (Set<String> val : factory._results.values()) {
+ cnt += val.size();
}
+ return cnt;
}
@Test()
@@ -562,25 +724,26 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- _factory.getMessageType(), _factory);
- manager = _startCMResultMap.get(hostDest)._manager;
+
+ // _participants[i].getMessagingService().registerMessageHandlerFactory(
+ // _factory.getMessageType(), _factory);
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
}
Message schedulerMessage =
- new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
- schedulerMessage.setTgtSessionId(SessionId.from("*"));
+ new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
schedulerMessage.setTgtName("CONTROLLER");
// TODO: change it to "ADMIN" ?
schedulerMessage.setSrcName("CONTROLLER");
// Template for the individual message sent to each participant
- Message msg = new Message(_factory.getMessageType(), MessageId.from("Template"));
- msg.setTgtSessionId(SessionId.from("*"));
- msg.setMsgState(MessageState.NEW);
+ Message msgTemplate = new Message(_factory.getMessageType(), "Template");
+ msgTemplate.setTgtSessionId("*");
+ msgTemplate.setMsgState(MessageState.NEW);
// Criteria to send individual messages
Criteria cr = new Criteria();
@@ -600,7 +763,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
String crString = sw.toString();
schedulerMessage.getRecord().setSimpleField("Criteria", crString);
- schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setMapField("MessageTemplate", msgTemplate.getRecord().getSimpleFields());
schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
@@ -635,8 +798,17 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
callback._message.getResultMap()
.get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
- final HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- final Builder keyBuilder = helixDataAccessor.keyBuilder();
+ boolean success = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ return count(_factory) == 60; // TestDB number_of_partitions x replicas
+ }
+ }, 10 * 1000);
+ Assert.assertTrue(success, "If not specifying participant, controller will send 60 messages");
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
ArrayList<String> msgIds = new ArrayList<String>();
for (int i = 0; i < NODE_NR; i++) {
callback = new MockAsyncCallback();
@@ -647,25 +819,34 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
sw = new StringWriter();
mapper.writeValue(sw, cr);
- schedulerMessage.setMessageId(MessageId.from(UUID.randomUUID().toString()));
-
- // need to use a different name for scheduler_task_queue task resource
- schedulerMessage.getRecord().setSimpleField(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg4_" + i);
-
+ schedulerMessage.setMsgId(UUID.randomUUID().toString());
crString = sw.toString();
schedulerMessage.getRecord().setSimpleField("Criteria", crString);
manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+
+ Thread.sleep(5000);
+ System.err.println("count: " + count(_factory));
+
String msgId =
callback._message.getResultMap().get(
DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
msgIds.add(msgId);
}
- for (int i = 0; i < NODE_NR; i++) {
- final String msgId = msgIds.get(i);
+ // System.err.println("count: " + count(_factory));
- waitMessageUpdate("Summary", msgId, helixDataAccessor);
+ for (int i = 0; i < NODE_NR; i++) {
+ String msgId = msgIds.get(i);
+ for (int j = 0; j < 100; j++) {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus =
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+ if (statusUpdate.getMapFields().containsKey("Summary")) {
+ // System.err.println(msgId+" done");
+ break;
+ }
+ }
PropertyKey controllerTaskStatus =
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
@@ -678,11 +859,24 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
messageResultCount++;
}
}
+ if (messageResultCount != _PARTITIONS * 3 / 5) {
+ int x = 10;
+ x = x + messageResultCount;
+ }
Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
}
- waitMessageUpdate("Summary", msgIdPrime, helixDataAccessor);
+ for (int j = 0; j < 100; j++) {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus =
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgIdPrime);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+ if (statusUpdate.getMapFields().containsKey("Summary")) {
+ break;
+ }
+ }
+ // Thread.sleep(5000);
int count = 0;
for (Set<String> val : _factory._results.values()) {
// System.out.println(val);
@@ -692,31 +886,147 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
Assert.assertEquals(count, _PARTITIONS * 3 * 2);
}
- /**
- * wait message summary to appear in controller-message-status-update
- * @param msgId
- * @param accessor
- * @return
- * @throws Exception
- */
- private boolean waitMessageUpdate(final String mapKey, final String msgId,
- final HelixDataAccessor accessor) throws Exception {
- final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- return TestHelper.verify(new TestHelper.Verifier() {
+ @Test
+ public void testSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException,
+ IOException, InterruptedException {
+ TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
+ HelixManager manager = null;
+ for (int i = 0; i < NODE_NR; i++) {
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
+ factory.getMessageType(), factory);
- @Override
- public boolean verify() throws Exception {
- PropertyKey key = keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), msgId);
- HelixProperty statusUpdate = accessor.getProperty(key);
- if (statusUpdate == null) {
- return false;
- }
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
+ factory.getMessageType(), factory);
- if (statusUpdate.getRecord().getMapField(mapKey) == null) {
- return false;
- }
- return true;
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
+ }
+
+ Message schedulerMessage =
+ new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
+ schedulerMessage.setTgtName("CONTROLLER");
+ // TODO: change it to "ADMIN" ?
+ schedulerMessage.setSrcName("CONTROLLER");
+
+ // Template for the individual message sent to each participant
+ Message msg = new Message(factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+
+ // Criteria to send individual messages
+ Criteria cr = new Criteria();
+ cr.setInstanceName("localhost_%");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setResource("%");
+ cr.setPartition("%");
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ String crString = sw.toString();
+
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+ schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+ schedulerMessage.getRecord().setSimpleField(
+ DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
+
+ Criteria cr2 = new Criteria();
+ cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+ cr2.setInstanceName("*");
+ cr2.setSessionSpecific(false);
+
+ MockAsyncCallback callback = new MockAsyncCallback();
+ mapper = new ObjectMapper();
+ serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+
+ // Set contraints that only 1 msg per participant
+ Map<String, String> constraints = new TreeMap<String, String>();
+ constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
+ constraints.put("TRANSITION", "OFFLINE-COMPLETED");
+ constraints.put("CONSTRAINT_VALUE", "1");
+ constraints.put("INSTANCE", ".*");
+ manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
+ ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(constraints));
+
+ // Send scheduler message
+ crString = sw.toString();
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+ String msgId =
+ callback._message.getResultMap()
+ .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+
+ for (int j = 0; j < 10; j++) {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus =
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+ if (statusUpdate.getMapFields().containsKey("SentMessageCount")) {
+ Assert.assertEquals(
+ statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""
+ + (_PARTITIONS * 3));
+ break;
}
- }, 20 * 1000);
+ }
+
+ for (int i = 0; i < _PARTITIONS * 3 / 5; i++) {
+ for (int j = 0; j < 10; j++) {
+ Thread.sleep(300);
+ if (factory._messageCount == 5 * (i + 1))
+ break;
+ }
+ Thread.sleep(300);
+ Assert.assertEquals(factory._messageCount, 5 * (i + 1));
+ factory.signal();
+ // System.err.println(i);
+ }
+
+ for (int j = 0; j < 10; j++) {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus =
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+ if (statusUpdate.getMapFields().containsKey("Summary")) {
+ break;
+ }
+ }
+
+ Assert.assertEquals(_PARTITIONS, factory._results.size());
+ PropertyKey controllerTaskStatus =
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+ Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
+ .equals("" + (_PARTITIONS * 3)));
+ int messageResultCount = 0;
+ for (String key : statusUpdate.getMapFields().keySet()) {
+ if (key.startsWith("MessageResult ")) {
+ messageResultCount++;
+ }
+ }
+ Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+
+ int count = 0;
+ for (Set<String> val : factory._results.values()) {
+ count += val.size();
+ }
+ Assert.assertEquals(count, _PARTITIONS * 3);
+
+ manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(),
+ ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
+
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
deleted file mode 100644
index b887fe7..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
+++ /dev/null
@@ -1,254 +0,0 @@
-package org.apache.helix.integration;
-
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.Criteria;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.messaging.AsyncCallback;
-import org.apache.helix.messaging.handling.HelixTaskResult;
-import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.ConstraintItem;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestSchedulerMsgContraints extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
-
- class MockAsyncCallback extends AsyncCallback {
- Message _message;
-
- public MockAsyncCallback() {
- }
-
- @Override
- public void onTimeOut() {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void onReplyMessage(Message message) {
- _message = message;
- }
- }
-
- public static class TestMessagingHandlerFactoryLatch implements MessageHandlerFactory {
- public volatile CountDownLatch _latch = new CountDownLatch(1);
- public int _messageCount = 0;
- public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
-
- @Override
- public synchronized MessageHandler createHandler(Message message, NotificationContext context) {
- _messageCount++;
- return new TestMessagingHandlerLatch(message, context);
- }
-
- public synchronized void signal() {
- _latch.countDown();
- _latch = new CountDownLatch(1);
- }
-
- @Override
- public String getMessageType() {
- return "TestMessagingHandlerLatch";
- }
-
- @Override
- public void reset() {
- // TODO Auto-generated method stub
- }
-
- public class TestMessagingHandlerLatch extends MessageHandler {
- public TestMessagingHandlerLatch(Message message, NotificationContext context) {
- super(message, context);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException {
- _latch.await();
- HelixTaskResult result = new HelixTaskResult();
- result.setSuccess(true);
- result.getTaskResultMap().put("Message", _message.getMessageId().stringify());
- String destName = _message.getTgtName();
- synchronized (_results) {
- if (!_results.containsKey(_message.getPartitionId().stringify())) {
- _results
- .put(_message.getPartitionId().stringify(), new ConcurrentSkipListSet<String>());
- }
- }
- _results.get(_message.getPartitionId().stringify()).add(destName);
- // System.err.println("Message " + _message.getMsgId() + " executed");
- return result;
- }
-
- @Override
- public void onError(Exception e, ErrorCode code, ErrorType type) {
- // TODO Auto-generated method stub
- }
- }
- }
-
- @Test
- public void testSchedulerMsgContraints() throws Exception {
- TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
- HelixManager manager = null;
- for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
- //
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
- manager = _startCMResultMap.get(hostDest)._manager;
- }
-
- Message schedulerMessage =
- new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
- schedulerMessage.setTgtSessionId(SessionId.from("*"));
- schedulerMessage.setTgtName("CONTROLLER");
- // TODO: change it to "ADMIN" ?
- schedulerMessage.setSrcName("CONTROLLER");
-
- // Template for the individual message sent to each participant
- Message msg = new Message(factory.getMessageType(), MessageId.from("Template"));
- msg.setTgtSessionId(SessionId.from("*"));
- msg.setMsgState(MessageState.NEW);
-
- // Criteria to send individual messages
- Criteria cr = new Criteria();
- cr.setInstanceName("localhost_%");
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
- cr.setResource("%");
- cr.setPartition("%");
-
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
- StringWriter sw = new StringWriter();
- mapper.writeValue(sw, cr);
-
- String crString = sw.toString();
-
- schedulerMessage.getRecord().setSimpleField("Criteria", crString);
- schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
- schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
- schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
- schedulerMessage.getRecord().setSimpleField(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
-
- Criteria cr2 = new Criteria();
- cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
- cr2.setInstanceName("*");
- cr2.setSessionSpecific(false);
-
- MockAsyncCallback callback = new MockAsyncCallback();
- mapper = new ObjectMapper();
- serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
- sw = new StringWriter();
- mapper.writeValue(sw, cr);
-
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
-
- // Set contraints that only 1 msg per participant
- Map<String, String> constraints = new TreeMap<String, String>();
- constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
- constraints.put("TRANSITION", "OFFLINE-COMPLETED");
- constraints.put("CONSTRAINT_VALUE", "1");
- constraints.put("INSTANCE", ".*");
- manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
- ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(constraints));
-
- // Send scheduler message
- crString = sw.toString();
- schedulerMessage.getRecord().setSimpleField("Criteria", crString);
- manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
- String msgId =
- callback._message.getResultMap()
- .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
-
- for (int j = 0; j < 10; j++) {
- Thread.sleep(200);
- PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
- if (statusUpdate.getMapFields().containsKey("SentMessageCount")) {
- Assert.assertEquals(
- statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""
- + (_PARTITIONS * 3));
- break;
- }
- }
-
- for (int i = 0; i < _PARTITIONS * 3 / 5; i++) {
- for (int j = 0; j < 10; j++) {
- Thread.sleep(300);
- if (factory._messageCount == 5 * (i + 1))
- break;
- }
- Thread.sleep(300);
- Assert.assertEquals(factory._messageCount, 5 * (i + 1));
- factory.signal();
- // System.err.println(i);
- }
-
- for (int j = 0; j < 10; j++) {
- Thread.sleep(200);
- PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
- if (statusUpdate.getMapFields().containsKey("Summary")) {
- break;
- }
- }
-
- Assert.assertEquals(_PARTITIONS, factory._results.size());
- PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
- Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
- .equals("" + (_PARTITIONS * 3)));
- int messageResultCount = 0;
- for (String key : statusUpdate.getMapFields().keySet()) {
- if (key.startsWith("MessageResult ")) {
- messageResultCount++;
- }
- }
- Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-
- int count = 0;
- for (Set<String> val : factory._results.values()) {
- count += val.size();
- }
- Assert.assertEquals(count, _PARTITIONS * 3);
-
- manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(),
- ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
deleted file mode 100644
index bbaa18d..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package org.apache.helix.integration;
-
-import java.io.StringWriter;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.helix.Criteria;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.messaging.handling.HelixTaskResult;
-import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestSchedulerMsgUsingQueue extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
- public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
- int cnt;
- public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
-
- public TestMessagingHandlerFactory() {
- super();
- cnt = 0;
- }
-
- @Override
- public MessageHandler createHandler(Message message, NotificationContext context) {
- // System.out.println("\t create-hdlr: " + message.getId());
- return new TestMessagingHandler(message, context);
- }
-
- @Override
- public String getMessageType() {
- return "TestParticipant";
- }
-
- @Override
- public void reset() {
- // TODO Auto-generated method stub
-
- }
-
- public class TestMessagingHandler extends MessageHandler {
- public TestMessagingHandler(Message message, NotificationContext context) {
- super(message, context);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException {
- HelixTaskResult result = new HelixTaskResult();
- result.setSuccess(true);
- // String tgtName = _message.getTgtName();
- String messageId = _message.getMessageId().stringify();
- String partitionId = _message.getPartitionId().stringify();
-
- result.getTaskResultMap().put("Message", messageId);
- synchronized (_results) {
- if (!_results.containsKey(partitionId)) {
- _results.put(partitionId, new HashSet<String>());
- }
- _results.get(partitionId).add(messageId);
- }
- cnt++;
- // System.err.println(cnt + ": message " + messageId + ", tgtName: " + tgtName
- // + ", partitionId: " + partitionId);
- return result;
- }
-
- @Override
- public void onError(Exception e, ErrorCode code, ErrorType type) {
- // TODO Auto-generated method stub
- }
- }
- }
-
- @Test()
- public void testSchedulerMsgUsingQueue() throws Exception {
- // Logger.getRootLogger().setLevel(Level.INFO);
- // _factory._results.clear();
- TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-
- HelixManager manager = null;
- for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
- manager = _startCMResultMap.get(hostDest)._manager;
- }
-
- Message schedulerMessage =
- new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
- schedulerMessage.setTgtSessionId(SessionId.from("*"));
- schedulerMessage.setTgtName("CONTROLLER");
- // TODO: change it to "ADMIN" ?
- schedulerMessage.setSrcName("CONTROLLER");
- schedulerMessage.getRecord().setSimpleField(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgUsingQueue");
- // Template for the individual message sent to each participant
- Message msg = new Message(factory.getMessageType(), MessageId.from("Template"));
- msg.setTgtSessionId(SessionId.from("*"));
- msg.setMsgState(MessageState.NEW);
-
- // Criteria to send individual messages
- Criteria cr = new Criteria();
- cr.setInstanceName("localhost_%");
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
- cr.setResource("%");
- cr.setPartition("%");
-
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
- StringWriter sw = new StringWriter();
- mapper.writeValue(sw, cr);
-
- String crString = sw.toString();
-
- schedulerMessage.getRecord().setSimpleField("Criteria", crString);
- schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
- schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
- helixDataAccessor.createProperty(
- keyBuilder.controllerMessage(schedulerMessage.getMessageId().stringify()), schedulerMessage);
-
- for (int i = 0; i < 30; i++) {
- Thread.sleep(2000);
- if (_PARTITIONS == factory._results.size()) {
- break;
- }
- }
-
- Assert.assertEquals(_PARTITIONS, factory._results.size());
- PropertyKey controllerTaskStatus =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), schedulerMessage
- .getMessageId().stringify());
-
- int messageResultCount = 0;
- for (int i = 0; i < 10; i++) {
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
- Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
- .equals("" + (_PARTITIONS * 3)));
- for (String key : statusUpdate.getMapFields().keySet()) {
- if (key.startsWith("MessageResult ")) {
- messageResultCount++;
- }
- }
- if (messageResultCount == _PARTITIONS * 3) {
- break;
- } else {
- Thread.sleep(2000);
- }
- }
- Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
- int count = 0;
- for (Set<String> val : factory._results.values()) {
- count += val.size();
- }
- Assert.assertEquals(count, _PARTITIONS * 3);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
index 3024f45..a927520 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
@@ -27,10 +27,10 @@ import org.apache.helix.HelixConstants;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -46,7 +46,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
String clusterName = className + "_" + methodName;
int n = 5;
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -70,14 +70,15 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
Arrays.asList(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString()));
accessor.setProperty(key, idealState);
- ClusterController controller = new ClusterController(clusterName, "controller", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
controller.syncStart();
// start n-1 participants
for (int i = 1; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -87,7 +88,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// start the remaining 1 participant
- participants[0] = new MockParticipant(clusterName, "localhost_12918", ZK_ADDR, null);
+ participants[0] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
participants[0].syncStart();
// make sure we have all participants in MASTER state
@@ -107,6 +108,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
}
// clean up
+ controller.syncStop();
for (int i = 0; i < n; i++) {
participants[i].syncStop();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
index 1b69572..4abb519 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
@@ -22,31 +22,29 @@ package org.apache.helix.integration;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
+ private static Logger LOG = Logger.getLogger(TestSessionExpiryInTransition.class);
public class SessionExpiryTransition extends MockTransition {
private final AtomicBoolean _done = new AtomicBoolean();
@Override
public void doTransition(Message message, NotificationContext context) {
- ZkHelixTestManager manager = (ZkHelixTestManager) context.getManager();
+ MockParticipantManager manager = (MockParticipantManager) context.getManager();
String instance = message.getTgtName();
PartitionId partition = message.getPartitionId();
@@ -57,8 +55,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
try {
ZkTestHelper.expireSession(manager.getZkClient());
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Exception expire zk-session", e);
}
}
}
@@ -66,7 +63,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
@Test
public void testSessionExpiryInTransition() throws Exception {
- Logger.getRootLogger().setLevel(Level.WARN);
+ // Logger.getRootLogger().setLevel(Level.WARN);
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
@@ -74,7 +71,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -86,15 +83,15 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- ZkHelixTestManager manager =
- new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
- participants[i] = new MockParticipant(manager, new SessionExpiryTransition());
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new SessionExpiryTransition());
participants[i].syncStart();
}
@@ -104,13 +101,11 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// clean up
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
- Thread.sleep(2000);
- controller.syncStop();
-
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
index 02a34d8..6eb7a8c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
@@ -21,9 +21,15 @@ package org.apache.helix.integration;
import java.util.Date;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.TestHelper.Verifier;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.LiveInstance;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
import org.testng.Assert;
@@ -35,16 +41,34 @@ public class TestStandAloneCMMain extends ZkStandAloneCMTestBase {
@Test()
public void testStandAloneCMMain() throws Exception {
logger.info("RUN testStandAloneCMMain() at " + new Date(System.currentTimeMillis()));
-
+ ClusterControllerManager newController = null;
for (int i = 1; i <= 2; i++) {
String controllerName = "controller_" + i;
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
+ newController =
+ new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ newController.syncStart();
}
- stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap);
+ // stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap);
+ _controller.syncStop();
+
+ final HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ final String newControllerName = newController.getInstanceName();
+ TestHelper.verify(new Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ if (leader == null) {
+ return false;
+ }
+ return leader.getInstanceName().equals(newControllerName);
+
+ }
+ }, 30 * 1000);
+
boolean result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, CLUSTER_NAME));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
index 81c08ed..dad998d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
@@ -21,11 +21,10 @@ package org.apache.helix.integration;
import java.util.Date;
-import org.apache.helix.InstanceType;
import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
@@ -47,18 +46,16 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase {
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, PARTICIPANT_PREFIX, "TestDB", 1, 20, 5, 3,
"MasterSlave", true);
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- ZkHelixTestManager manager =
- new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
- participants[i] = new MockParticipant(manager, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
- ZkHelixTestManager controller =
- new ZkHelixTestManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
- controller.connect();
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
boolean result;
result =
@@ -67,7 +64,7 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// participant session expiry
- ZkHelixTestManager participantToExpire = participants[1].getManager();
+ MockParticipantManager participantToExpire = participants[1];
System.out.println("Expire participant session");
String oldSessionId = participantToExpire.getSessionId();
@@ -107,8 +104,7 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase {
// clean up
System.out.println("Clean up ...");
// Logger.getRootLogger().setLevel(Level.DEBUG);
- controller.disconnect();
- Thread.sleep(100);
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
index dce3fd4..d191c18 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
@@ -25,7 +25,7 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -54,10 +54,10 @@ public class TestStartMultipleControllersWithSameName extends ZkIntegrationTestB
// rebalance
// start controller
- ClusterController[] controllers = new ClusterController[4];
+ ClusterControllerManager[] controllers = new ClusterControllerManager[4];
for (int i = 0; i < 4; i++) {
- controllers[i] = new ClusterController(clusterName, "controller_0", ZK_ADDR);
- controllers[i].start();
+ controllers[i] = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controllers[i].syncStart();
}
Thread.sleep(500); // wait leader election finishes
@@ -69,7 +69,6 @@ public class TestStartMultipleControllersWithSameName extends ZkIntegrationTestB
// clean up
for (int i = 0; i < 4; i++) {
controllers[i].syncStop();
- Thread.sleep(1000); // wait for all zk callbacks done
}
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index aff40b3..318177a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -29,17 +29,13 @@ import java.util.Set;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
import org.apache.helix.mock.participant.MockMSStateModel;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.mock.participant.SleepTransition;
import org.apache.helix.model.ExternalView;
@@ -60,15 +56,14 @@ import org.testng.annotations.Test;
public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
private static Logger LOG = Logger.getLogger(TestStateTransitionTimeout.class);
+ @Override
@BeforeClass
public void beforeClass() throws Exception {
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
String namespace = "/" + CLUSTER_NAME;
- if (_zkClient.exists(namespace)) {
- _zkClient.deleteRecursive(namespace);
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
}
_setupTool = new ClusterSetup(ZK_ADDR);
@@ -107,12 +102,14 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
_sleep = sleep;
}
+ @Override
@Transition(to = "SLAVE", from = "OFFLINE")
public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
LOG.info("Become SLAVE from OFFLINE");
}
+ @Override
@Transition(to = "MASTER", from = "SLAVE")
public void onBecomeMasterFromSlave(Message message, NotificationContext context)
throws InterruptedException {
@@ -122,23 +119,27 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
}
}
+ @Override
@Transition(to = "SLAVE", from = "MASTER")
public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
LOG.info("Become SLAVE from MASTER");
}
+ @Override
@Transition(to = "OFFLINE", from = "SLAVE")
public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
LOG.info("Become OFFLINE from SLAVE");
}
+ @Override
@Transition(to = "DROPPED", from = "OFFLINE")
public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
LOG.info("Become DROPPED from OFFLINE");
}
+ @Override
public void rollbackOnError(Message message, NotificationContext context,
StateTransitionError error) {
_error = error;
@@ -172,7 +173,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
@Test
public void testStateTransitionTimeOut() throws Exception {
Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
- MockParticipant[] participants = new MockParticipant[NODE_NR];
+ // MockParticipantManager[] participants = new MockParticipantManager[NODE_NR];
IdealState idealState =
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
for (int i = 0; i < NODE_NR; i++) {
@@ -185,19 +186,20 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
}
}
- participants[i] = new MockParticipant(factory, CLUSTER_NAME, instanceName, ZK_ADDR, null);
- participants[i].syncStart();
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", factory);
+ _participants[i].syncStart();
}
String controllerName = CONTROLLER_PREFIX + "_0";
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
+ _controller =
+ new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
boolean result =
ClusterStateVerifier
.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
- HelixDataAccessor accessor = participants[0].getManager().getHelixDataAccessor();
+ HelixDataAccessor accessor = _participants[0].getHelixDataAccessor();
Builder kb = accessor.keyBuilder();
ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
index b2a5719..6cce716 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
@@ -21,9 +21,8 @@ package org.apache.helix.integration;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -33,8 +32,7 @@ import org.testng.annotations.Test;
public class TestSwapInstance extends ZkStandAloneCMTestBase {
@Test
public void TestSwap() throws Exception {
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixManager manager = _controller;
HelixDataAccessor helixAccessor = manager.getHelixDataAccessor();
_setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 64, STATE_MODEL);
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", _replica);
@@ -49,7 +47,7 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
idealStateOld2.merge(is2.getRecord());
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+ ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
_setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
boolean result =
@@ -68,8 +66,7 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
}
Assert.assertTrue(exception);
- _startCMResultMap.get(instanceName)._manager.disconnect();
- _startCMResultMap.get(instanceName)._thread.interrupt();
+ _participants[0].syncStop();
Thread.sleep(1000);
exception = false;
@@ -80,8 +77,9 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
exception = true;
}
Assert.assertFalse(exception);
- StartCMResult result2 = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName2);
- _startCMResultMap.put(instanceName2, result2);
+ MockParticipantManager newParticipant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName2);
+ newParticipant.syncStart();
result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(