You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/05/27 04:04:22 UTC

[GitHub] [helix] mgao0 commented on a change in pull request #1000: Add message periodic refresh

mgao0 commented on a change in pull request #1000:
URL: https://github.com/apache/helix/pull/1000#discussion_r430762033



##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -210,12 +245,25 @@ protected void handleEvent(NotificationContext event) {
 
   public CallbackHandler(HelixManager manager, RealmAwareZkClient client, PropertyKey propertyKey,
       Object listener, EventType[] eventTypes, ChangeType changeType) {
-    this(manager, client, propertyKey, listener, eventTypes, changeType, null);
+    this(manager, client, propertyKey, listener, eventTypes, changeType, null, -1);

Review comment:
       Nice catch. Fixed 

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -202,6 +214,29 @@ protected void handleEvent(NotificationContext event) {
     }
   }
 
+  class RefreshTask implements Runnable {
+    @Override
+    public void run() {
+      try {
+        long currentTime = System.currentTimeMillis();
+        if (_lastEventTime + _periodicRefreshInterval <= currentTime) {
+          NotificationContext changeContext = new NotificationContext(_manager);
+          changeContext.setType(NotificationContext.Type.PERIODIC_REFRESH);
+          changeContext.setChangeType(_changeType);
+          enqueueTask(changeContext);
+        } else {
+          long remainingTime = _lastEventTime + _periodicRefreshInterval - currentTime;
+          _scheduledRefreshFuture.cancel(false);
+          _scheduledRefreshFuture = _periodicRefreshExecutor
+              .scheduleWithFixedDelay(this, remainingTime, _periodicRefreshInterval,
+                  TimeUnit.MILLISECONDS);

Review comment:
       Good idea! I changed it to sleep. One concern I had is that the thread could be interrupted while sleeping, but since we only have one thread here, I guess it's fine.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -781,8 +841,15 @@ void reset(boolean isShutdown) {
           if (isShutdown) {
             _batchCallbackProcessor.shutdown();
             _batchCallbackProcessor = null;
+            if (_periodicRefreshExecutor != null) {
+              _periodicRefreshExecutor.shutdownNow();
+            }
           } else {
             _batchCallbackProcessor.resetEventQueue();
+            if (_periodicRefreshExecutor != null) {
+              _periodicRefreshExecutor.shutdownNow();

Review comment:
       Yes, the shutdownNow method will automatically cancel the scheduled tasks. Now I changed to another way for reset (not shutting down), I only cancel the scheduled future now, instead of shut down the executor and initialize another one, I think it's more efficient.

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -1135,6 +1145,34 @@ private void removeMessageFromZK(HelixDataAccessor accessor, Message message,
     }
   }
 
+  private void startMessageRefreshThread() {

Review comment:
       Added this comment in CallbackHandler. Thanks for reminding!

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
##########
@@ -70,7 +70,7 @@ public DefaultMessagingService(HelixManager manager) {
 
     _taskExecutor = new HelixTaskExecutor(
         new ParticipantStatusMonitor(isParticipant, manager.getInstanceName()),
-        new MessageQueueMonitor(manager.getClusterName(), manager.getInstanceName()));
+        new MessageQueueMonitor(manager.getClusterName(), manager.getInstanceName()), manager);

Review comment:
       Resolving this conversation since we are not doing it this way now.

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -714,7 +724,7 @@ private void syncSessionToController(HelixManager manager) {
 
   @Override
   @PreFetch(enabled = false)
-  public void onMessage(String instanceName, List<Message> messages,
+  public synchronized void onMessage(String instanceName, List<Message> messages,

Review comment:
       Per our conversation, moved this change from HelixTaskExecutor to CallbackHandler. Instead of call onMessage directly, for refresh, we enqueue a specific event to the queue to trigger onMessage.

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -1135,6 +1145,34 @@ private void removeMessageFromZK(HelixDataAccessor accessor, Message message,
     }
   }
 
+  private void startMessageRefreshThread() {
+    int periodicMessageRefreshInterval =
+        HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.PERIODIC_MESSAGE_REFRESH_INTERVAL, -1);
+    if (periodicMessageRefreshInterval > 0) {
+      _messageRefreshTimer = new Timer();
+      try {
+        {
+          if (_manager == null) {
+            throw new HelixException("Periodic message refresh skipped. No helix manager is set.");
+          }
+        }
+        TimerTask messageRefreshTask = new TimerTask() {
+          @Override
+          public void run() {
+            // Trigger read messages from zk
+            NotificationContext changeContext = new NotificationContext(_manager);
+            changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+            onMessage(_manager.getInstanceName(), null, changeContext);
+          }
+        };
+        _messageRefreshTimer.schedule(messageRefreshTask, periodicMessageRefreshInterval,
+            periodicMessageRefreshInterval);

Review comment:
       Good suggestion. In current design, I put the check in invoke method, and if the onMessage (in the case of messages) is not called for a period of time, it will do the refresh.

##########
File path: helix-core/src/test/java/org/apache/helix/messaging/handling/TestPerioidicMessageRefresh.java
##########
@@ -0,0 +1,141 @@
+package org.apache.helix.messaging.handling;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.Message;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestPerioidicMessageRefresh extends ZkStandAloneCMTestBase {
+  private HelixManager manager;
+  private HelixDataAccessor accessor;
+  private PropertyKey.Builder keyBuilder;
+  private HelixTaskExecutor executor;
+  private TestMessageHandlerFactory factory;
+
+  public class TestMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
+    int _handlersCreated = 0;
+    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();
+
+    class TestMessageHandler extends MessageHandler {
+      public TestMessageHandler(Message message, NotificationContext context) {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
+        HelixTaskResult result = new HelixTaskResult();
+        _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+        Thread.sleep(100);
+        result.setSuccess(true);
+        return result;
+      }
+
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type) {
+        // TODO Auto-generated method stub
+
+      }
+    }
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext context) {
+      // TODO Auto-generated method stub
+      if (message.getMsgSubType() != null && message.getMsgSubType().equals("EXCEPTION")) {
+        throw new HelixException("Test Message handler exception, can ignore");
+      }
+      _handlersCreated++;
+      return new TestMessageHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType() {
+      // TODO Auto-generated method stub
+      return "TestingMessageHandler";
+    }
+
+    @Override
+    public List<String> getMessageTypes() {
+      return Collections.singletonList("TestingMessageHandler");
+    }
+
+    @Override
+    public void reset() {
+      // TODO Auto-generated method stub
+
+    }
+  }
+
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    manager = _participants[0];
+    accessor = manager.getHelixDataAccessor();
+    keyBuilder = accessor.keyBuilder();
+    executor = new HelixTaskExecutor();
+    factory = new TestMessageHandlerFactory();
+    manager.removeListener(keyBuilder.messages(manager.getInstanceName()), _controller);
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
+  }
+
+  @Test
+  public void testPeriodicMessageRefresh() throws Exception {
+    // Set messages in zk
+    List<String> messageIds = new ArrayList<>();
+    int nMsgs1 = 5;
+    for (int i = 0; i < nMsgs1; i++) {
+      Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setTgtName(manager.getInstanceName());
+      msg.setSrcName("S");
+      msg.setCorrelationId(UUID.randomUUID().toString());
+      accessor.setProperty(keyBuilder.message(manager.getInstanceName(), msg.getId()), msg);
+      messageIds.add(msg.getId());
+    }
+
+    // We removed message listeners, so there should be no message processed
+    TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() throws Exception {
+        return factory._processedMsgIds.size() == 0;
+      }
+    }, 2000);
+
+    // Start periodic message refresh
+    Properties properties = new Properties();
+    properties
+        .setProperty(SystemPropertyKeys.PERIODIC_MESSAGE_REFRESH_INTERVAL, String.valueOf(20));
+    System.setProperties(properties);
+
+    Method startMessageRefreshThread = HelixTaskExecutor.class
+        .getDeclaredMethod("startMessageRefreshThread");
+    startMessageRefreshThread.setAccessible(true);
+    startMessageRefreshThread.invoke(executor);
+
+    // Now we should be able to get the messages and process them
+    TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() throws Exception {
+        return factory._processedMsgIds.size() == messageIds.size();
+      }
+    }, 2000);

Review comment:
       Didn't know we have this. Thanks.

##########
File path: helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -202,6 +212,26 @@ protected void handleEvent(NotificationContext event) {
     }
   }
 
+  class RefreshTask implements Runnable {
+    @Override
+    public void run() {
+      try {
+        long currentTime = System.currentTimeMillis();
+        if (_lastEventTime + _periodicRefreshInterval <= currentTime) {
+          NotificationContext changeContext = new NotificationContext(_manager);
+          changeContext.setType(NotificationContext.Type.PERIODIC_REFRESH);
+          changeContext.setChangeType(_changeType);
+          enqueueTask(changeContext);
+        } else {
+          long remainingTime = _lastEventTime + _periodicRefreshInterval - currentTime;
+          Thread.sleep(remainingTime);

Review comment:
       Didn't think of that! Thanks for catching this. I think I would prefer my previous method then, the framework handles it better. And I searched, to cancel a future and to schedule another one, is merely to remove a task from workQueue and create another task and put it into workQueue, so it won't be too costly, especially if we are doing it for relatively long interval (20, 30 minutes).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org