You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/11/01 23:02:21 UTC

[7/8] helix git commit: Improve helix message timeout task

Improve helix message timeout task

>From logs and code, it could be a very rare race condition that the message actually has been processed and completed but message has not been removed. Once it is completed, it should cancel the timeout task running with separated thread. But just before it tried to cancel the task, the message has been timed out and message handling thread has been interrupted by time out task thread, which shown in the log.

So the message handling thread did not catch the interrupted exception at that moment and failed to remove message from ZK with READ state. After I manually removed the message, we got an error log that showing the partition is already LEADER now. That proves the assumption that the message has been successfully process.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1507f016
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1507f016
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1507f016

Branch: refs/heads/master
Commit: 1507f0161df24f4bec3ba3632b2d23a7a9bed5d4
Parents: c783ae7
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed Oct 10 16:59:28 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 14:38:58 2018 -0700

----------------------------------------------------------------------
 .../helix/messaging/handling/HelixTask.java     | 28 +++++++++++++-------
 1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1507f016/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 2f3d805..fb55e76 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -22,7 +22,6 @@ package org.apache.helix.messaging.handling;
 import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -96,6 +95,9 @@ public class HelixTask implements MessageTask {
       handlerStart = System.currentTimeMillis();
       taskResult = _handler.handleMessage();
       handlerEnd = System.currentTimeMillis();
+
+      // cancel timeout task
+      _executor.cancelTimeoutTask(this);
     } catch (InterruptedException e) {
       taskResult = new HelixTaskResult();
       taskResult.setException(e);
@@ -116,9 +118,6 @@ public class HelixTask implements MessageTask {
       _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, _manager);
     }
 
-    // cancel timeout task
-    _executor.cancelTimeoutTask(this);
-
     Exception exception = null;
     try {
       if (taskResult.isSuccess()) {
@@ -182,13 +181,9 @@ public class HelixTask implements MessageTask {
         }
       }
 
-      if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
-        removeMessageFromZk(accessor, _message);
-        reportMessageStat(_manager, _message, taskResult);
-        sendReply(getSrcClusterDataAccessor(_message), _message, taskResult);
-        _executor.finishTask(this);
-      }
+      finalCleanup(taskResult);
     } catch (Exception e) {
+      finalCleanup(taskResult);
       exception = e;
       type = ErrorType.FRAMEWORK;
       code = ErrorCode.ERROR;
@@ -377,4 +372,17 @@ public class HelixTask implements MessageTask {
     }
     _isStarted = true;
   }
+
+  private void finalCleanup(HelixTaskResult taskResult) {
+    try {
+      if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
+        removeMessageFromZk(_manager.getHelixDataAccessor(), _message);
+        reportMessageStat(_manager, _message, taskResult);
+        sendReply(getSrcClusterDataAccessor(_message), _message, taskResult);
+        _executor.finishTask(this);
+      }
+    } catch (Exception e) {
+      logger.error(String.format("Error to final clean up for message : %s", _message.getId()));
+    }
+  }
 }