You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/05/18 19:12:37 UTC

[GitHub] [helix] narendly commented on a change in pull request #1000: Add message periodic refresh

narendly commented on a change in pull request #1000:
URL: https://github.com/apache/helix/pull/1000#discussion_r425538729



##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestPerioidicMessageRefresh.java
##########
@@ -0,0 +1,141 @@
+package org.apache.helix.messaging.handling;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.Message;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestPerioidicMessageRefresh extends ZkStandAloneCMTestBase {

Review comment:
       Typo in class name.

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -1135,6 +1145,34 @@ private void removeMessageFromZK(HelixDataAccessor accessor, Message message,
     }
   }
 
+  private void startMessageRefreshThread() {
+    int periodicMessageRefreshInterval =
+        HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.PERIODIC_MESSAGE_REFRESH_INTERVAL, -1);

Review comment:
       Use a NOT_SET constant instead of -1

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -1135,6 +1145,34 @@ private void removeMessageFromZK(HelixDataAccessor accessor, Message message,
     }
   }
 
+  private void startMessageRefreshThread() {
+    int periodicMessageRefreshInterval =
+        HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.PERIODIC_MESSAGE_REFRESH_INTERVAL, -1);
+    if (periodicMessageRefreshInterval > 0) {

Review comment:
       do a comparison with the aforementioned NOT_SET bit..

##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestPerioidicMessageRefresh.java
##########
@@ -0,0 +1,141 @@
+package org.apache.helix.messaging.handling;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.Message;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestPerioidicMessageRefresh extends ZkStandAloneCMTestBase {
+  private HelixManager manager;
+  private HelixDataAccessor accessor;
+  private PropertyKey.Builder keyBuilder;
+  private HelixTaskExecutor executor;
+  private TestMessageHandlerFactory factory;
+
+  public class TestMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
+    int _handlersCreated = 0;
+    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();
+
+    class TestMessageHandler extends MessageHandler {
+      public TestMessageHandler(Message message, NotificationContext context) {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
+        HelixTaskResult result = new HelixTaskResult();
+        _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+        Thread.sleep(100);

Review comment:
       Let's try to avoid Thread.sleep()

##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestPerioidicMessageRefresh.java
##########
@@ -0,0 +1,141 @@
+package org.apache.helix.messaging.handling;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.Message;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestPerioidicMessageRefresh extends ZkStandAloneCMTestBase {
+  private HelixManager manager;
+  private HelixDataAccessor accessor;
+  private PropertyKey.Builder keyBuilder;
+  private HelixTaskExecutor executor;
+  private TestMessageHandlerFactory factory;
+
+  public class TestMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
+    int _handlersCreated = 0;
+    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();
+
+    class TestMessageHandler extends MessageHandler {
+      public TestMessageHandler(Message message, NotificationContext context) {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
+        HelixTaskResult result = new HelixTaskResult();
+        _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+        Thread.sleep(100);
+        result.setSuccess(true);
+        return result;
+      }
+
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type) {
+        // TODO Auto-generated method stub
+
+      }
+    }
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext context) {
+      // TODO Auto-generated method stub
+      if (message.getMsgSubType() != null && message.getMsgSubType().equals("EXCEPTION")) {
+        throw new HelixException("Test Message handler exception, can ignore");
+      }
+      _handlersCreated++;
+      return new TestMessageHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType() {
+      // TODO Auto-generated method stub
+      return "TestingMessageHandler";
+    }
+
+    @Override
+    public List<String> getMessageTypes() {
+      return Collections.singletonList("TestingMessageHandler");
+    }
+
+    @Override
+    public void reset() {
+      // TODO Auto-generated method stub
+
+    }
+  }
+
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    manager = _participants[0];
+    accessor = manager.getHelixDataAccessor();
+    keyBuilder = accessor.keyBuilder();
+    executor = new HelixTaskExecutor();
+    factory = new TestMessageHandlerFactory();
+    manager.removeListener(keyBuilder.messages(manager.getInstanceName()), _controller);
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
+  }
+
+  @Test
+  public void testPeriodicMessageRefresh() throws Exception {
+    // Set messages in zk
+    List<String> messageIds = new ArrayList<>();
+    int nMsgs1 = 5;
+    for (int i = 0; i < nMsgs1; i++) {
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setTgtName(manager.getInstanceName());
+      msg.setSrcName("S");
+      msg.setCorrelationId(UUID.randomUUID().toString());
+      accessor.setProperty(keyBuilder.message(manager.getInstanceName(), msg.getId()), msg);
+      messageIds.add(msg.getId());
+    }
+
+    // We removed message listeners, so there should be no message processed
+    TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() throws Exception {
+        return factory._processedMsgIds.size() == 0;
+      }
+    }, 2000);

Review comment:
       Instead of 2000, don't we have a constant we use in TestHelper? Let's use that.

##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestPerioidicMessageRefresh.java
##########
@@ -0,0 +1,141 @@
+package org.apache.helix.messaging.handling;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.Message;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestPerioidicMessageRefresh extends ZkStandAloneCMTestBase {
+  private HelixManager manager;
+  private HelixDataAccessor accessor;
+  private PropertyKey.Builder keyBuilder;
+  private HelixTaskExecutor executor;
+  private TestMessageHandlerFactory factory;
+
+  public class TestMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
+    int _handlersCreated = 0;
+    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();
+
+    class TestMessageHandler extends MessageHandler {
+      public TestMessageHandler(Message message, NotificationContext context) {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
+        HelixTaskResult result = new HelixTaskResult();
+        _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+        Thread.sleep(100);
+        result.setSuccess(true);
+        return result;
+      }
+
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type) {
+        // TODO Auto-generated method stub
+
+      }
+    }
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext context) {
+      // TODO Auto-generated method stub
+      if (message.getMsgSubType() != null && message.getMsgSubType().equals("EXCEPTION")) {
+        throw new HelixException("Test Message handler exception, can ignore");
+      }
+      _handlersCreated++;
+      return new TestMessageHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType() {
+      // TODO Auto-generated method stub
+      return "TestingMessageHandler";
+    }
+
+    @Override
+    public List<String> getMessageTypes() {
+      return Collections.singletonList("TestingMessageHandler");
+    }
+
+    @Override
+    public void reset() {
+      // TODO Auto-generated method stub
+
+    }
+  }
+
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    manager = _participants[0];
+    accessor = manager.getHelixDataAccessor();
+    keyBuilder = accessor.keyBuilder();
+    executor = new HelixTaskExecutor();
+    factory = new TestMessageHandlerFactory();
+    manager.removeListener(keyBuilder.messages(manager.getInstanceName()), _controller);
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
+  }
+
+  @Test
+  public void testPeriodicMessageRefresh() throws Exception {

Review comment:
       Overall, a good idea to add a quick JavaDoc on what you're going to test and how.

##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestPerioidicMessageRefresh.java
##########
@@ -0,0 +1,141 @@
+package org.apache.helix.messaging.handling;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.Message;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestPerioidicMessageRefresh extends ZkStandAloneCMTestBase {
+  private HelixManager manager;
+  private HelixDataAccessor accessor;
+  private PropertyKey.Builder keyBuilder;
+  private HelixTaskExecutor executor;
+  private TestMessageHandlerFactory factory;
+
+  public class TestMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
+    int _handlersCreated = 0;
+    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();
+
+    class TestMessageHandler extends MessageHandler {
+      public TestMessageHandler(Message message, NotificationContext context) {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
+        HelixTaskResult result = new HelixTaskResult();
+        _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+        Thread.sleep(100);
+        result.setSuccess(true);
+        return result;
+      }
+
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type) {
+        // TODO Auto-generated method stub
+
+      }
+    }
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext context) {
+      // TODO Auto-generated method stub
+      if (message.getMsgSubType() != null && message.getMsgSubType().equals("EXCEPTION")) {
+        throw new HelixException("Test Message handler exception, can ignore");
+      }
+      _handlersCreated++;
+      return new TestMessageHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType() {
+      // TODO Auto-generated method stub
+      return "TestingMessageHandler";
+    }
+
+    @Override
+    public List<String> getMessageTypes() {
+      return Collections.singletonList("TestingMessageHandler");
+    }
+
+    @Override
+    public void reset() {
+      // TODO Auto-generated method stub
+
+    }
+  }
+
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    manager = _participants[0];
+    accessor = manager.getHelixDataAccessor();
+    keyBuilder = accessor.keyBuilder();
+    executor = new HelixTaskExecutor();
+    factory = new TestMessageHandlerFactory();
+    manager.removeListener(keyBuilder.messages(manager.getInstanceName()), _controller);
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
+  }
+
+  @Test
+  public void testPeriodicMessageRefresh() throws Exception {
+    // Set messages in zk
+    List<String> messageIds = new ArrayList<>();
+    int nMsgs1 = 5;
+    for (int i = 0; i < nMsgs1; i++) {
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setTgtName(manager.getInstanceName());
+      msg.setSrcName("S");
+      msg.setCorrelationId(UUID.randomUUID().toString());
+      accessor.setProperty(keyBuilder.message(manager.getInstanceName(), msg.getId()), msg);
+      messageIds.add(msg.getId());
+    }
+
+    // We removed message listeners, so there should be no message processed
+    TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() throws Exception {
+        return factory._processedMsgIds.size() == 0;
+      }
+    }, 2000);
+
+    // Start periodic message refresh
+    Properties properties = new Properties();
+    properties
+        .setProperty(SystemPropertyKeys.PERIODIC_MESSAGE_REFRESH_INTERVAL, String.valueOf(20));
+    System.setProperties(properties);
+
+    Method startMessageRefreshThread = HelixTaskExecutor.class
+        .getDeclaredMethod("startMessageRefreshThread");
+    startMessageRefreshThread.setAccessible(true);
+    startMessageRefreshThread.invoke(executor);
+
+    // Now we should be able to get the messages and process them
+    TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() throws Exception {
+        return factory._processedMsgIds.size() == messageIds.size();
+      }
+    }, 2000);

Review comment:
       Instead of 2000, don't we have a constant we use in TestHelper? Let's use that.

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -152,6 +155,11 @@ public HelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor) {
 
   public HelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor,
       MessageQueueMonitor messageQueueMonitor) {
+    this(participantStatusMonitor, messageQueueMonitor, null);
+  }
+
+  public HelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor,
+      MessageQueueMonitor messageQueueMonitor, HelixManager manager) {

Review comment:
       Do we really need to pass in the reference to the manager here?

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -1135,6 +1145,34 @@ private void removeMessageFromZK(HelixDataAccessor accessor, Message message,
     }
   }
 
+  private void startMessageRefreshThread() {
+    int periodicMessageRefreshInterval =
+        HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.PERIODIC_MESSAGE_REFRESH_INTERVAL, -1);
+    if (periodicMessageRefreshInterval > 0) {
+      _messageRefreshTimer = new Timer();
+      try {
+        {
+          if (_manager == null) {
+            throw new HelixException("Periodic message refresh skipped. No helix manager is set.");
+          }
+        }
+        TimerTask messageRefreshTask = new TimerTask() {
+          @Override
+          public void run() {
+            // Trigger read messages from zk
+            NotificationContext changeContext = new NotificationContext(_manager);
+            changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+            onMessage(_manager.getInstanceName(), null, changeContext);
+          }
+        };
+        _messageRefreshTimer.schedule(messageRefreshTask, periodicMessageRefreshInterval,
+            periodicMessageRefreshInterval);

Review comment:
       I am not sure if we're doing this right.
   
   Usually, this kind of periodic refresh pattern works when there haven't been any recent message reads - for example, only trigger this when there's no messages read within the last 20 minutes (for example). 
   
   This will be problematic because we are just adding message read events no matter how recent we've read messages. To illustrate, if we just read messages, we don't need to refresh messages again for the duration of `periodicMessageRefreshInterval`.
   
   The timer/scheduler should always reset the timer whenever `onMessage` gets invoked. That way, we don't need to make onMessage synchronized necessarily.
   
   You can refer to the example of periodic refresh for Helix controller.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org