You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/03 21:44:02 UTC

[2/5] helix git commit: [HELIX-669] State Transition Cancellation Client side change Part II

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 2f78007..5304a45 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -29,12 +29,15 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.Mocks;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+
 public class TestHelixTaskExecutor {
   public static class MockClusterManager extends Mocks.MockManager {
     @Override
@@ -85,6 +88,10 @@ public class TestHelixTaskExecutor {
       return "TestingMessageHandler";
     }
 
+    @Override public List<String> getMessageTypes() {
+      return ImmutableList.of("TestingMessageHandler");
+    }
+
     @Override
     public void reset() {
       // TODO Auto-generated method stub
@@ -99,6 +106,12 @@ public class TestHelixTaskExecutor {
       return "TestingMessageHandler2";
     }
 
+    @Override
+    public List<String> getMessageTypes() {
+      // TODO Auto-generated method stub
+      return ImmutableList.of("TestingMessageHandler2");
+    }
+
   }
 
   class CancellableHandlerFactory implements MessageHandlerFactory {
@@ -165,6 +178,10 @@ public class TestHelixTaskExecutor {
       return "Cancellable";
     }
 
+    @Override public List<String> getMessageTypes() {
+      return ImmutableList.of("Cancellable");
+    }
+
     @Override
     public void reset() {
       // TODO Auto-generated method stub
@@ -186,26 +203,36 @@ public class TestHelixTaskExecutor {
         super(message, context);
       }
 
-      @Override public HelixTaskResult handleMessage() throws InterruptedException {
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
         HelixTaskResult result = new HelixTaskResult();
         _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
         result.setSuccess(true);
         return result;
       }
 
-      @Override public void onError(Exception e, ErrorCode code, ErrorType type) {
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type) {
       }
     }
 
-    @Override public MessageHandler createHandler(Message message, NotificationContext context) {
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext context) {
       return new TestStateTransitionMessageHandler(message, context);
     }
 
-    @Override public String getMessageType() {
+    @Override
+    public String getMessageType() {
       return _msgType;
     }
 
-    @Override public void reset() {
+    @Override
+    public List<String> getMessageTypes() {
+      return ImmutableList.of(_msgType);
+    }
+
+    @Override
+    public void reset() {
 
     }
   }
@@ -217,17 +244,21 @@ public class TestHelixTaskExecutor {
     HelixManager manager = new MockClusterManager();
 
     TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
 
     TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
-    executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
+    for (String type : factory2.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory2);
+    }
 
     NotificationContext changeContext = new NotificationContext(manager);
     List<Message> msgList = new ArrayList<Message>();
 
     int nMsgs1 = 5;
     for (int i = 0; i < nMsgs1; i++) {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId(manager.getSessionId());
       msg.setTgtName("Localhost_1123");
       msg.setSrcName("127.101.1.23_2234");
@@ -237,7 +268,7 @@ public class TestHelixTaskExecutor {
 
     int nMsgs2 = 6;
     for (int i = 0; i < nMsgs2; i++) {
-      Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId(manager.getSessionId());
       msg.setTgtName("Localhost_1123");
       msg.setSrcName("127.101.1.23_2234");
@@ -269,7 +300,9 @@ public class TestHelixTaskExecutor {
     HelixManager manager = new MockClusterManager();
 
     TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
 
     TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
 
@@ -278,7 +311,7 @@ public class TestHelixTaskExecutor {
 
     int nMsgs1 = 5;
     for (int i = 0; i < nMsgs1; i++) {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId(manager.getSessionId());
       msg.setTgtName("Localhost_1123");
       msg.setSrcName("127.101.1.23_2234");
@@ -287,7 +320,7 @@ public class TestHelixTaskExecutor {
 
     int nMsgs2 = 4;
     for (int i = 0; i < nMsgs2; i++) {
-      Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId(manager.getSessionId());
       msg.setTgtName("Localhost_1123");
       msg.setSrcName("127.101.1.23_2234");
@@ -303,7 +336,7 @@ public class TestHelixTaskExecutor {
     AssertJUnit.assertTrue(factory2._handlersCreated == 0);
 
     for (Message message : msgList) {
-      if (message.getMsgType().equalsIgnoreCase(factory.getMessageType())) {
+      if (factory.getMessageTypes().contains(message.getMsgType())) {
         AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
       }
     }
@@ -315,17 +348,21 @@ public class TestHelixTaskExecutor {
     HelixManager manager = new MockClusterManager();
 
     TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
 
     TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
-    executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
+    for (String type : factory2.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory2);
+    }
 
     NotificationContext changeContext = new NotificationContext(manager);
     List<Message> msgList = new ArrayList<Message>();
 
     int nMsgs1 = 5;
     for (int i = 0; i < nMsgs1; i++) {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId("*");
       msg.setTgtName("");
       msgList.add(msg);
@@ -333,7 +370,7 @@ public class TestHelixTaskExecutor {
 
     int nMsgs2 = 4;
     for (int i = 0; i < nMsgs2; i++) {
-      Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId("some other session id");
       msg.setTgtName("");
       msgList.add(msg);
@@ -348,7 +385,7 @@ public class TestHelixTaskExecutor {
     AssertJUnit.assertTrue(factory2._handlersCreated == 0);
 
     for (Message message : msgList) {
-      if (message.getMsgType().equalsIgnoreCase(factory.getMessageType())) {
+      if (factory.getMessageTypes().contains(message.getMsgType())) {
         AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
       }
     }
@@ -361,21 +398,23 @@ public class TestHelixTaskExecutor {
     HelixManager manager = new MockClusterManager();
 
     TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
 
     NotificationContext changeContext = new NotificationContext(manager);
     List<Message> msgList = new ArrayList<Message>();
 
     int nMsgs1 = 5;
     for (int i = 0; i < nMsgs1; i++) {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId(manager.getSessionId());
       msg.setTgtName("Localhost_1123");
       msg.setSrcName("127.101.1.23_2234");
       msg.setCorrelationId(UUID.randomUUID().toString());
       msgList.add(msg);
     }
-    Message exceptionMsg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+    Message exceptionMsg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
     exceptionMsg.setTgtSessionId(manager.getSessionId());
     exceptionMsg.setMsgSubType("EXCEPTION");
     exceptionMsg.setTgtName("Localhost_1123");
@@ -400,14 +439,15 @@ public class TestHelixTaskExecutor {
     HelixManager manager = new MockClusterManager();
 
     CancellableHandlerFactory factory = new CancellableHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
     NotificationContext changeContext = new NotificationContext(manager);
     List<Message> msgList = new ArrayList<Message>();
 
     int nMsgs1 = 0;
     for (int i = 0; i < nMsgs1; i++) {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId("*");
       msg.setTgtName("Localhost_1123");
       msg.setSrcName("127.101.1.23_2234");
@@ -417,7 +457,7 @@ public class TestHelixTaskExecutor {
     List<Message> msgListToCancel = new ArrayList<Message>();
     int nMsgs2 = 4;
     for (int i = 0; i < nMsgs2; i++) {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId("*");
       msgList.add(msg);
       msg.setTgtName("Localhost_1123");
@@ -439,7 +479,7 @@ public class TestHelixTaskExecutor {
     AssertJUnit.assertTrue(factory._processingMsgIds.size() == nMsgs1 + nMsgs2);
 
     for (Message message : msgList) {
-      if (message.getMsgType().equalsIgnoreCase(factory.getMessageType())) {
+      if (factory.getMessageTypes().contains(message.getMsgType())) {
         AssertJUnit.assertTrue(factory._processingMsgIds.containsKey(message.getId()));
       }
     }
@@ -452,18 +492,23 @@ public class TestHelixTaskExecutor {
     HelixManager manager = new MockClusterManager();
 
     TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
     TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
-    executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
-
+    for (String type : factory2.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory2);
+    }
     CancellableHandlerFactory factory3 = new CancellableHandlerFactory();
-    executor.registerMessageHandlerFactory(factory3.getMessageType(), factory3);
+    for (String type : factory3.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory3);
+    }
+
     int nMsg1 = 10, nMsg2 = 10, nMsg3 = 10;
     List<Message> msgList = new ArrayList<Message>();
 
     for (int i = 0; i < nMsg1; i++) {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId("*");
       msg.setTgtName("Localhost_1123");
       msg.setSrcName("127.101.1.23_2234");
@@ -471,7 +516,7 @@ public class TestHelixTaskExecutor {
     }
 
     for (int i = 0; i < nMsg2; i++) {
-      Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId("*");
       msgList.add(msg);
       msg.setTgtName("Localhost_1123");
@@ -480,7 +525,7 @@ public class TestHelixTaskExecutor {
     }
 
     for (int i = 0; i < nMsg3; i++) {
-      Message msg = new Message(factory3.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory3.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId("*");
       msgList.add(msg);
       msg.setTgtName("Localhost_1123");
@@ -509,15 +554,16 @@ public class TestHelixTaskExecutor {
     HelixManager manager = new MockClusterManager();
 
     CancellableHandlerFactory factory = new CancellableHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
     NotificationContext changeContext = new NotificationContext(manager);
 
     List<Message> msgList = new ArrayList<Message>();
     int nMsgs2 = 4;
     // Test the case in which retry = 0
     for (int i = 0; i < nMsgs2; i++) {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId("*");
       msg.setTgtName("Localhost_1123");
       msg.setSrcName("127.101.1.23_2234");
@@ -532,7 +578,7 @@ public class TestHelixTaskExecutor {
     AssertJUnit.assertEquals(factory._timedOutMsgIds.size(), 2);
     // AssertJUnit.assertFalse(msgList.get(0).getRecord().getSimpleFields().containsKey("TimeOut"));
     for (int i = 0; i < nMsgs2 - 2; i++) {
-      if (msgList.get(i).getMsgType().equalsIgnoreCase(factory.getMessageType())) {
+      if (factory.getMessageTypes().contains(msgList.get(i).getMsgType())) {
         AssertJUnit.assertTrue(msgList.get(i).getRecord().getSimpleFields()
             .containsKey("Cancelcount"));
         AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId()));
@@ -550,8 +596,9 @@ public class TestHelixTaskExecutor {
     HelixManager manager = new MockClusterManager();
 
     CancellableHandlerFactory factory = new CancellableHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
     NotificationContext changeContext = new NotificationContext(manager);
 
     List<Message> msgList = new ArrayList<Message>();
@@ -561,7 +608,7 @@ public class TestHelixTaskExecutor {
     // Test the case that the message are executed for the second time
     int nMsgs2 = 4;
     for (int i = 0; i < nMsgs2; i++) {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
       msg.setTgtSessionId("*");
       msg.setTgtName("Localhost_1123");
       msg.setSrcName("127.101.1.23_2234");
@@ -615,6 +662,5 @@ public class TestHelixTaskExecutor {
     Thread.sleep(3000);
     AssertJUnit.assertEquals(cancelFactory._processedMsgIds.size(), 0);
     AssertJUnit.assertEquals(stateTransitionFactory._processedMsgIds.size(), 0);
-
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java
index f1848a5..a4fcec6 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java
@@ -1,5 +1,10 @@
 package org.apache.helix.mock.participant;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.helix.NotificationContext;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModel;
@@ -31,15 +36,15 @@ import org.apache.log4j.Logger;
 })
 public class MockDelayMSStateModel extends StateModel {
   private static Logger LOG = Logger.getLogger(MockDelayMSStateModel.class);
-
   private long _delay;
 
   public MockDelayMSStateModel(long delay) {
     _delay = delay;
+    _cancelled = false;
   }
 
   @Transition(to = "SLAVE", from = "OFFLINE")
-  public void onBecomeSLAVEFromOffline(Message message, NotificationContext context) {
+  public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
     if (_delay > 0) {
       try {
         Thread.sleep(_delay);
@@ -50,10 +55,17 @@ public class MockDelayMSStateModel extends StateModel {
     LOG.info("Become SLAVE from OFFLINE");
   }
 
-  @Transition(to = "ONLINE", from = "SLAVE")
-  public void onBecomeMasterFromSlave(Message message, NotificationContext context) {
-    LOG.info("Become ONLINE from SLAVE");
+  @Transition(to = "MASTER", from = "SLAVE")
+  public void onBecomeMasterFromSlave(Message message, NotificationContext context)
+      throws InterruptedException {
+    if (_delay < 0) {
+        Thread.sleep(Math.abs(_delay));
+    }
+    LOG.error("Become MASTER from SLAVE");
   }
 
-
+  @Transition(to = "OFFLINE", from = "SLAVE")
+  public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
+    LOG.info("Become OFFLINE from SLAVE");
+  }
 }