You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/31 20:50:49 UTC

[2/2] helix git commit: Improve message handling properly

Improve message handling properly

We need some improvement for client side to help stabilize the message handling. Current scenario is that when process message, Helix marked the message read before really scheduled.

In this case, if scheduling has problem, but the message already has been marked as read, no one will take care of the message and hang there forever.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1103fecb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1103fecb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1103fecb

Branch: refs/heads/master
Commit: 1103fecb67def5e610a7b22636ba4ac25e23777b
Parents: 9659370
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Sep 17 11:51:31 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Oct 31 13:50:37 2018 -0700

----------------------------------------------------------------------
 .../helix/messaging/handling/HelixTaskExecutor.java  | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1103fecb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 5e2082c..3ae90d3 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -952,22 +952,25 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
+      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
         Message msg = handler._message;
-        scheduleTask(
-            new HelixTask(msg, context, handler, this)
-        );
+        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
+          removeMessageFromTaskAndFutureMap(msg);
+          removeMessageFromZK(accessor, msg, instanceName);
+        }
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
         Message msg = handler._message;
-        scheduleTask(
-            new HelixTask(msg, context, handler, this)
-        );
+        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
+          removeMessageFromTaskAndFutureMap(msg);
+          removeMessageFromZK(accessor, msg, instanceName);
+        }
       }
     }
   }