You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/03 21:44:02 UTC
[2/5] helix git commit: [HELIX-669] State Transition Cancellation
Client side change Part II
http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 2f78007..5304a45 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -29,12 +29,15 @@ import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.Mocks;
import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableList;
+
public class TestHelixTaskExecutor {
public static class MockClusterManager extends Mocks.MockManager {
@Override
@@ -85,6 +88,10 @@ public class TestHelixTaskExecutor {
return "TestingMessageHandler";
}
+ @Override public List<String> getMessageTypes() {
+ return ImmutableList.of("TestingMessageHandler");
+ }
+
@Override
public void reset() {
// TODO Auto-generated method stub
@@ -99,6 +106,12 @@ public class TestHelixTaskExecutor {
return "TestingMessageHandler2";
}
+ @Override
+ public List<String> getMessageTypes() {
+ // TODO Auto-generated method stub
+ return ImmutableList.of("TestingMessageHandler2");
+ }
+
}
class CancellableHandlerFactory implements MessageHandlerFactory {
@@ -165,6 +178,10 @@ public class TestHelixTaskExecutor {
return "Cancellable";
}
+ @Override public List<String> getMessageTypes() {
+ return ImmutableList.of("Cancellable");
+ }
+
@Override
public void reset() {
// TODO Auto-generated method stub
@@ -186,26 +203,36 @@ public class TestHelixTaskExecutor {
super(message, context);
}
- @Override public HelixTaskResult handleMessage() throws InterruptedException {
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException {
HelixTaskResult result = new HelixTaskResult();
_processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
result.setSuccess(true);
return result;
}
- @Override public void onError(Exception e, ErrorCode code, ErrorType type) {
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
}
}
- @Override public MessageHandler createHandler(Message message, NotificationContext context) {
+ @Override
+ public MessageHandler createHandler(Message message, NotificationContext context) {
return new TestStateTransitionMessageHandler(message, context);
}
- @Override public String getMessageType() {
+ @Override
+ public String getMessageType() {
return _msgType;
}
- @Override public void reset() {
+ @Override
+ public List<String> getMessageTypes() {
+ return ImmutableList.of(_msgType);
+ }
+
+ @Override
+ public void reset() {
}
}
@@ -217,17 +244,21 @@ public class TestHelixTaskExecutor {
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+ for (String type : factory.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory);
+ }
TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
- executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
+ for (String type : factory2.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory2);
+ }
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
int nMsgs1 = 5;
for (int i = 0; i < nMsgs1; i++) {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
@@ -237,7 +268,7 @@ public class TestHelixTaskExecutor {
int nMsgs2 = 6;
for (int i = 0; i < nMsgs2; i++) {
- Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
@@ -269,7 +300,9 @@ public class TestHelixTaskExecutor {
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+ for (String type : factory.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory);
+ }
TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
@@ -278,7 +311,7 @@ public class TestHelixTaskExecutor {
int nMsgs1 = 5;
for (int i = 0; i < nMsgs1; i++) {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
@@ -287,7 +320,7 @@ public class TestHelixTaskExecutor {
int nMsgs2 = 4;
for (int i = 0; i < nMsgs2; i++) {
- Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
@@ -303,7 +336,7 @@ public class TestHelixTaskExecutor {
AssertJUnit.assertTrue(factory2._handlersCreated == 0);
for (Message message : msgList) {
- if (message.getMsgType().equalsIgnoreCase(factory.getMessageType())) {
+ if (factory.getMessageTypes().contains(message.getMsgType())) {
AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
}
}
@@ -315,17 +348,21 @@ public class TestHelixTaskExecutor {
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+ for (String type : factory.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory);
+ }
TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
- executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
+ for (String type : factory2.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory2);
+ }
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
int nMsgs1 = 5;
for (int i = 0; i < nMsgs1; i++) {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msg.setTgtName("");
msgList.add(msg);
@@ -333,7 +370,7 @@ public class TestHelixTaskExecutor {
int nMsgs2 = 4;
for (int i = 0; i < nMsgs2; i++) {
- Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("some other session id");
msg.setTgtName("");
msgList.add(msg);
@@ -348,7 +385,7 @@ public class TestHelixTaskExecutor {
AssertJUnit.assertTrue(factory2._handlersCreated == 0);
for (Message message : msgList) {
- if (message.getMsgType().equalsIgnoreCase(factory.getMessageType())) {
+ if (factory.getMessageTypes().contains(message.getMsgType())) {
AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
}
}
@@ -361,21 +398,23 @@ public class TestHelixTaskExecutor {
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+ for (String type : factory.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory);
+ }
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
int nMsgs1 = 5;
for (int i = 0; i < nMsgs1; i++) {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setCorrelationId(UUID.randomUUID().toString());
msgList.add(msg);
}
- Message exceptionMsg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ Message exceptionMsg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
exceptionMsg.setTgtSessionId(manager.getSessionId());
exceptionMsg.setMsgSubType("EXCEPTION");
exceptionMsg.setTgtName("Localhost_1123");
@@ -400,14 +439,15 @@ public class TestHelixTaskExecutor {
HelixManager manager = new MockClusterManager();
CancellableHandlerFactory factory = new CancellableHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
+ for (String type : factory.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory);
+ }
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
int nMsgs1 = 0;
for (int i = 0; i < nMsgs1; i++) {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
@@ -417,7 +457,7 @@ public class TestHelixTaskExecutor {
List<Message> msgListToCancel = new ArrayList<Message>();
int nMsgs2 = 4;
for (int i = 0; i < nMsgs2; i++) {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msgList.add(msg);
msg.setTgtName("Localhost_1123");
@@ -439,7 +479,7 @@ public class TestHelixTaskExecutor {
AssertJUnit.assertTrue(factory._processingMsgIds.size() == nMsgs1 + nMsgs2);
for (Message message : msgList) {
- if (message.getMsgType().equalsIgnoreCase(factory.getMessageType())) {
+ if (factory.getMessageTypes().contains(message.getMsgType())) {
AssertJUnit.assertTrue(factory._processingMsgIds.containsKey(message.getId()));
}
}
@@ -452,18 +492,23 @@ public class TestHelixTaskExecutor {
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
+ for (String type : factory.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory);
+ }
TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
- executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
-
+ for (String type : factory2.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory2);
+ }
CancellableHandlerFactory factory3 = new CancellableHandlerFactory();
- executor.registerMessageHandlerFactory(factory3.getMessageType(), factory3);
+ for (String type : factory3.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory3);
+ }
+
int nMsg1 = 10, nMsg2 = 10, nMsg3 = 10;
List<Message> msgList = new ArrayList<Message>();
for (int i = 0; i < nMsg1; i++) {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
@@ -471,7 +516,7 @@ public class TestHelixTaskExecutor {
}
for (int i = 0; i < nMsg2; i++) {
- Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msgList.add(msg);
msg.setTgtName("Localhost_1123");
@@ -480,7 +525,7 @@ public class TestHelixTaskExecutor {
}
for (int i = 0; i < nMsg3; i++) {
- Message msg = new Message(factory3.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory3.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msgList.add(msg);
msg.setTgtName("Localhost_1123");
@@ -509,15 +554,16 @@ public class TestHelixTaskExecutor {
HelixManager manager = new MockClusterManager();
CancellableHandlerFactory factory = new CancellableHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
+ for (String type : factory.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory);
+ }
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
int nMsgs2 = 4;
// Test the case in which retry = 0
for (int i = 0; i < nMsgs2; i++) {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
@@ -532,7 +578,7 @@ public class TestHelixTaskExecutor {
AssertJUnit.assertEquals(factory._timedOutMsgIds.size(), 2);
// AssertJUnit.assertFalse(msgList.get(0).getRecord().getSimpleFields().containsKey("TimeOut"));
for (int i = 0; i < nMsgs2 - 2; i++) {
- if (msgList.get(i).getMsgType().equalsIgnoreCase(factory.getMessageType())) {
+ if (factory.getMessageTypes().contains(msgList.get(i).getMsgType())) {
AssertJUnit.assertTrue(msgList.get(i).getRecord().getSimpleFields()
.containsKey("Cancelcount"));
AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId()));
@@ -550,8 +596,9 @@ public class TestHelixTaskExecutor {
HelixManager manager = new MockClusterManager();
CancellableHandlerFactory factory = new CancellableHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
+ for (String type : factory.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory);
+ }
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
@@ -561,7 +608,7 @@ public class TestHelixTaskExecutor {
// Test the case that the message are executed for the second time
int nMsgs2 = 4;
for (int i = 0; i < nMsgs2; i++) {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
@@ -615,6 +662,5 @@ public class TestHelixTaskExecutor {
Thread.sleep(3000);
AssertJUnit.assertEquals(cancelFactory._processedMsgIds.size(), 0);
AssertJUnit.assertEquals(stateTransitionFactory._processedMsgIds.size(), 0);
-
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java
index f1848a5..a4fcec6 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java
@@ -1,5 +1,10 @@
package org.apache.helix.mock.participant;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;
@@ -31,15 +36,15 @@ import org.apache.log4j.Logger;
})
public class MockDelayMSStateModel extends StateModel {
private static Logger LOG = Logger.getLogger(MockDelayMSStateModel.class);
-
private long _delay;
public MockDelayMSStateModel(long delay) {
_delay = delay;
+ _cancelled = false;
}
@Transition(to = "SLAVE", from = "OFFLINE")
- public void onBecomeSLAVEFromOffline(Message message, NotificationContext context) {
+ public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
if (_delay > 0) {
try {
Thread.sleep(_delay);
@@ -50,10 +55,17 @@ public class MockDelayMSStateModel extends StateModel {
LOG.info("Become SLAVE from OFFLINE");
}
- @Transition(to = "ONLINE", from = "SLAVE")
- public void onBecomeMasterFromSlave(Message message, NotificationContext context) {
- LOG.info("Become ONLINE from SLAVE");
+ @Transition(to = "MASTER", from = "SLAVE")
+ public void onBecomeMasterFromSlave(Message message, NotificationContext context)
+ throws InterruptedException {
+ if (_delay < 0) {
+ Thread.sleep(Math.abs(_delay));
+ }
+ LOG.error("Become MASTER from SLAVE");
}
-
+ @Transition(to = "OFFLINE", from = "SLAVE")
+ public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
+ LOG.info("Become OFFLINE from SLAVE");
+ }
}