You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/05/14 18:23:55 UTC

[GitHub] [helix] pkuwm commented on a change in pull request #1000: Add message periodic refresh

pkuwm commented on a change in pull request #1000:
URL: https://github.com/apache/helix/pull/1000#discussion_r425319951



##########
File path: helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
##########
@@ -78,4 +78,7 @@
   // System Property Metadata Store Directory Server endpoint key
   public static final String MSDS_SERVER_ENDPOINT_KEY =
       MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY;
+
+  public static final String PERIODIC_MESSAGE_REFRESH_INTERVAL =
+      "helix.messaging.handling.HelixTaskExecutor.periodicMessageRefreshInterval.ms";

Review comment:
       I feel this name is a bit too long. Is it specific for HelixManager? If so, can we rename it like `helix.manager.message.refresh.interval.ms`,`helix.task.executor.message.interval.ms` or something like that?

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
##########
@@ -70,7 +70,7 @@ public DefaultMessagingService(HelixManager manager) {
 
     _taskExecutor = new HelixTaskExecutor(
         new ParticipantStatusMonitor(isParticipant, manager.getInstanceName()),
-        new MessageQueueMonitor(manager.getClusterName(), manager.getInstanceName()));
+        new MessageQueueMonitor(manager.getClusterName(), manager.getInstanceName()), manager);

Review comment:
       +1. Suggest not passing the manager to the method.

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -1135,6 +1145,34 @@ private void removeMessageFromZK(HelixDataAccessor accessor, Message message,
     }
   }
 
+  private void startMessageRefreshThread() {
+    int periodicMessageRefreshInterval =
+        HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.PERIODIC_MESSAGE_REFRESH_INTERVAL, -1);
+    if (periodicMessageRefreshInterval > 0) {
+      _messageRefreshTimer = new Timer();
+      try {
+        {
+          if (_manager == null) {

Review comment:
       Do you think it may be cleaner if we check this null in constructor instead and not start this thread if null, which avoids this throw HelixException, catch and log?

##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestPerioidicMessageRefresh.java
##########
@@ -0,0 +1,141 @@
+package org.apache.helix.messaging.handling;
+

Review comment:
       Don't forget Apache license :)

##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestPerioidicMessageRefresh.java
##########
@@ -0,0 +1,141 @@
+package org.apache.helix.messaging.handling;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.Message;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestPerioidicMessageRefresh extends ZkStandAloneCMTestBase {
+  private HelixManager manager;
+  private HelixDataAccessor accessor;
+  private PropertyKey.Builder keyBuilder;
+  private HelixTaskExecutor executor;
+  private TestMessageHandlerFactory factory;
+
+  public class TestMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
+    int _handlersCreated = 0;
+    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();
+
+    class TestMessageHandler extends MessageHandler {
+      public TestMessageHandler(Message message, NotificationContext context) {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
+        HelixTaskResult result = new HelixTaskResult();
+        _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+        Thread.sleep(100);

Review comment:
       What's the purpose of sleep?

##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestPerioidicMessageRefresh.java
##########
@@ -0,0 +1,141 @@
+package org.apache.helix.messaging.handling;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.Message;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestPerioidicMessageRefresh extends ZkStandAloneCMTestBase {
+  private HelixManager manager;
+  private HelixDataAccessor accessor;
+  private PropertyKey.Builder keyBuilder;
+  private HelixTaskExecutor executor;
+  private TestMessageHandlerFactory factory;
+
+  public class TestMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
+    int _handlersCreated = 0;
+    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();

Review comment:
       Use interface: `Map<String, String> _processedMsgIds`?

##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestPerioidicMessageRefresh.java
##########
@@ -0,0 +1,141 @@
+package org.apache.helix.messaging.handling;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.Message;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestPerioidicMessageRefresh extends ZkStandAloneCMTestBase {
+  private HelixManager manager;
+  private HelixDataAccessor accessor;
+  private PropertyKey.Builder keyBuilder;
+  private HelixTaskExecutor executor;
+  private TestMessageHandlerFactory factory;

Review comment:
       Underscore as prefix to be consistent?

##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestPerioidicMessageRefresh.java
##########
@@ -0,0 +1,141 @@
+package org.apache.helix.messaging.handling;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.Message;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestPerioidicMessageRefresh extends ZkStandAloneCMTestBase {
+  private HelixManager manager;
+  private HelixDataAccessor accessor;
+  private PropertyKey.Builder keyBuilder;
+  private HelixTaskExecutor executor;
+  private TestMessageHandlerFactory factory;
+
+  public class TestMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
+    int _handlersCreated = 0;
+    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();
+
+    class TestMessageHandler extends MessageHandler {
+      public TestMessageHandler(Message message, NotificationContext context) {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
+        HelixTaskResult result = new HelixTaskResult();
+        _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+        Thread.sleep(100);
+        result.setSuccess(true);
+        return result;
+      }
+
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type) {
+        // TODO Auto-generated method stub
+
+      }
+    }
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext context) {
+      // TODO Auto-generated method stub
+      if (message.getMsgSubType() != null && message.getMsgSubType().equals("EXCEPTION")) {
+        throw new HelixException("Test Message handler exception, can ignore");
+      }
+      _handlersCreated++;
+      return new TestMessageHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType() {
+      // TODO Auto-generated method stub
+      return "TestingMessageHandler";
+    }
+
+    @Override
+    public List<String> getMessageTypes() {
+      return Collections.singletonList("TestingMessageHandler");
+    }
+
+    @Override
+    public void reset() {
+      // TODO Auto-generated method stub
+
+    }
+  }
+
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    manager = _participants[0];
+    accessor = manager.getHelixDataAccessor();
+    keyBuilder = accessor.keyBuilder();
+    executor = new HelixTaskExecutor();
+    factory = new TestMessageHandlerFactory();
+    manager.removeListener(keyBuilder.messages(manager.getInstanceName()), _controller);
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
+  }
+
+  @Test
+  public void testPeriodicMessageRefresh() throws Exception {
+    // Set messages in zk
+    List<String> messageIds = new ArrayList<>();
+    int nMsgs1 = 5;
+    for (int i = 0; i < nMsgs1; i++) {
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setTgtName(manager.getInstanceName());
+      msg.setSrcName("S");
+      msg.setCorrelationId(UUID.randomUUID().toString());
+      accessor.setProperty(keyBuilder.message(manager.getInstanceName(), msg.getId()), msg);
+      messageIds.add(msg.getId());
+    }
+
+    // We removed message listeners, so there should be no message processed
+    TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() throws Exception {
+        return factory._processedMsgIds.size() == 0;
+      }
+    }, 2000);
+
+    // Start periodic message refresh
+    Properties properties = new Properties();
+    properties
+        .setProperty(SystemPropertyKeys.PERIODIC_MESSAGE_REFRESH_INTERVAL, String.valueOf(20));
+    System.setProperties(properties);

Review comment:
       I suggest using  `setProperty()` as just one single property is set.
   It is a good habit to clear the system property after this test so it eliminates possibility of affecting other tests after this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org