You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2018/03/25 04:59:34 UTC

helix git commit: [HELIX-681] don't fail state transition task if we fail to remove message or send out relay message

Repository: helix
Updated Branches:
  refs/heads/master 4d652eb9a -> c1ab0b5ed


[HELIX-681] don't fail state transition task if we fail to remove message or send out relay message


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c1ab0b5e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c1ab0b5e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c1ab0b5e

Branch: refs/heads/master
Commit: c1ab0b5ed27c777ef63bfc7247415b6928e72906
Parents: 4d652eb
Author: Harry Zhang <zh...@usc.edu>
Authored: Mon Mar 19 17:35:46 2018 -0700
Committer: Harry Zhang <zh...@usc.edu>
Committed: Sat Mar 24 21:31:38 2018 -0700

----------------------------------------------------------------------
 .../helix/messaging/handling/HelixTask.java     | 20 +++++++++----------
 .../messaging/handling/HelixTaskExecutor.java   |  8 +++++---
 .../java/org/apache/helix/util/HelixUtil.java   | 21 +++++++++++++++++++-
 3 files changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c1ab0b5e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 85665c1..6091e53 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -42,6 +42,7 @@ import org.apache.helix.monitoring.StateTransitionContext;
 import org.apache.helix.monitoring.StateTransitionDataPoint;
 import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
 import org.apache.helix.task.TaskResult;
+import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -168,7 +169,14 @@ public class HelixTask implements MessageTask {
 
       // forward relay messages attached to this message to other participants
       if (taskResult.isSuccess()) {
-        forwardRelayMessages(accessor, _message, taskResult.getCompleteTime());
+        try {
+          forwardRelayMessages(accessor, _message, taskResult.getCompleteTime());
+        } catch (Exception e) {
+          // Fail to send relay message should not result in a task execution failure
+          // Currently we don't log error to ZK to reduce writes as when accessor throws
+          // exception, ZK might not be in good condition.
+          logger.warn("Failed to send relay messages.", e);
+        }
       }
 
       if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
@@ -204,15 +212,7 @@ public class HelixTask implements MessageTask {
   }
 
   private void removeMessageFromZk(HelixDataAccessor accessor, Message message) {
-    Builder keyBuilder = accessor.keyBuilder();
-    PropertyKey msgKey;
-    if (message.getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name())) {
-      msgKey = keyBuilder.controllerMessage(message.getMsgId());
-    } else {
-      msgKey = keyBuilder.message(_manager.getInstanceName(), message.getMsgId());
-    }
-    boolean success = accessor.removeProperty(msgKey);
-    if (!success) {
+    if (!HelixUtil.removeMessageFromZK(accessor, message, _manager.getInstanceName())) {
       logger.warn("Failed to delete message " + message.getId() + " from zk!");
     } else {
       logger.info("Delete message " + message.getId() + " from zk!");

http://git-wip-us.apache.org/repos/asf/helix/blob/c1ab0b5e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 064b6fd..2f0e087 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -69,6 +69,7 @@ import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor.ProcessedMes
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1059,9 +1060,10 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
   private void removeMessageFromZK(HelixDataAccessor accessor, Message message,
       String instanceName) {
-    boolean success = accessor.removeProperty(message.getKey(accessor.keyBuilder(), instanceName));
-    if (!success) {
-      LOG.warn("Failed to remove message " + message.getId() + " from zk!");
+    if (HelixUtil.removeMessageFromZK(accessor, message, instanceName)) {
+      LOG.info("Successfully removed message {} from ZK.", message.getMsgId());
+    } else {
+      LOG.warn("Failed to remove message {} from ZK.", message.getMsgId());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c1ab0b5e/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 79cff0e..d0be481 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -28,7 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyType;
 import org.apache.helix.controller.rebalancer.AbstractRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -37,6 +37,7 @@ import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -219,4 +220,22 @@ public final class HelixUtil {
 
     return idealStateMap;
   }
+
+  /**
+   * Remove the given message from ZK using the given accessor. This function will
+   * not throw exception
+   * @param accessor HelixDataAccessor
+   * @param msg message to remove
+   * @param instanceName name of the instance on which the message sits
+   * @return true if success else false
+   */
+  public static boolean removeMessageFromZK(HelixDataAccessor accessor, Message msg,
+      String instanceName) {
+    try {
+      return accessor.removeProperty(msg.getKey(accessor.keyBuilder(), instanceName));
+    } catch (Exception e) {
+      LOG.error("Caught exception while removing message {}.", msg, e);
+    }
+    return false;
+  }
 }