You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2018/03/25 04:59:34 UTC
helix git commit: [HELIX-681] don't fail state transition task if we
fail to remove message or send out relay message
Repository: helix
Updated Branches:
refs/heads/master 4d652eb9a -> c1ab0b5ed
[HELIX-681] don't fail state transition task if we fail to remove message or send out relay message
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c1ab0b5e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c1ab0b5e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c1ab0b5e
Branch: refs/heads/master
Commit: c1ab0b5ed27c777ef63bfc7247415b6928e72906
Parents: 4d652eb
Author: Harry Zhang <zh...@usc.edu>
Authored: Mon Mar 19 17:35:46 2018 -0700
Committer: Harry Zhang <zh...@usc.edu>
Committed: Sat Mar 24 21:31:38 2018 -0700
----------------------------------------------------------------------
.../helix/messaging/handling/HelixTask.java | 20 +++++++++----------
.../messaging/handling/HelixTaskExecutor.java | 8 +++++---
.../java/org/apache/helix/util/HelixUtil.java | 21 +++++++++++++++++++-
3 files changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c1ab0b5e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 85665c1..6091e53 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -42,6 +42,7 @@ import org.apache.helix.monitoring.StateTransitionContext;
import org.apache.helix.monitoring.StateTransitionDataPoint;
import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
import org.apache.helix.task.TaskResult;
+import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.StatusUpdateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -168,7 +169,14 @@ public class HelixTask implements MessageTask {
// forward relay messages attached to this message to other participants
if (taskResult.isSuccess()) {
- forwardRelayMessages(accessor, _message, taskResult.getCompleteTime());
+ try {
+ forwardRelayMessages(accessor, _message, taskResult.getCompleteTime());
+ } catch (Exception e) {
+ // Fail to send relay message should not result in a task execution failure
+ // Currently we don't log error to ZK to reduce writes as when accessor throws
+ // exception, ZK might not be in good condition.
+ logger.warn("Failed to send relay messages.", e);
+ }
}
if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
@@ -204,15 +212,7 @@ public class HelixTask implements MessageTask {
}
private void removeMessageFromZk(HelixDataAccessor accessor, Message message) {
- Builder keyBuilder = accessor.keyBuilder();
- PropertyKey msgKey;
- if (message.getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name())) {
- msgKey = keyBuilder.controllerMessage(message.getMsgId());
- } else {
- msgKey = keyBuilder.message(_manager.getInstanceName(), message.getMsgId());
- }
- boolean success = accessor.removeProperty(msgKey);
- if (!success) {
+ if (!HelixUtil.removeMessageFromZK(accessor, message, _manager.getInstanceName())) {
logger.warn("Failed to delete message " + message.getId() + " from zk!");
} else {
logger.info("Delete message " + message.getId() + " from zk!");
http://git-wip-us.apache.org/repos/asf/helix/blob/c1ab0b5e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 064b6fd..2f0e087 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -69,6 +69,7 @@ import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor.ProcessedMes
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.StatusUpdateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1059,9 +1060,10 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
private void removeMessageFromZK(HelixDataAccessor accessor, Message message,
String instanceName) {
- boolean success = accessor.removeProperty(message.getKey(accessor.keyBuilder(), instanceName));
- if (!success) {
- LOG.warn("Failed to remove message " + message.getId() + " from zk!");
+ if (HelixUtil.removeMessageFromZK(accessor, message, instanceName)) {
+ LOG.info("Successfully removed message {} from ZK.", message.getMsgId());
+ } else {
+ LOG.warn("Failed to remove message {} from ZK.", message.getMsgId());
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1ab0b5e/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 79cff0e..d0be481 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -28,7 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyType;
import org.apache.helix.controller.rebalancer.AbstractRebalancer;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -37,6 +37,7 @@ import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -219,4 +220,22 @@ public final class HelixUtil {
return idealStateMap;
}
+
+ /**
+ * Remove the given message from ZK using the given accessor. This function will
+ * not throw exception
+ * @param accessor HelixDataAccessor
+ * @param msg message to remove
+ * @param instanceName name of the instance on which the message sits
+ * @return true if success else false
+ */
+ public static boolean removeMessageFromZK(HelixDataAccessor accessor, Message msg,
+ String instanceName) {
+ try {
+ return accessor.removeProperty(msg.getKey(accessor.keyBuilder(), instanceName));
+ } catch (Exception e) {
+ LOG.error("Caught exception while removing message {}.", msg, e);
+ }
+ return false;
+ }
}