You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2018/09/17 22:26:54 UTC
[1/3] helix git commit: Fix P2P message logic in controller to avoid
sending duplicated messages to participants.
Repository: helix
Updated Branches:
refs/heads/master f1c503712 -> 74145e8ad
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index a88965b..c8048fa 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -21,6 +21,7 @@ package org.apache.helix.messaging.p2pMessage;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
import org.apache.helix.HelixConstants;
import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.pipeline.Pipeline;
@@ -54,8 +55,8 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
private void preSetup() {
- setupIdealState(3, new String[]{db}, numPartition, numReplica, IdealState.RebalanceMode.SEMI_AUTO,
- BuiltInStateModelDefinitions.MasterSlave.name());
+ setupIdealState(3, new String[] { db }, numPartition, numReplica,
+ IdealState.RebalanceMode.SEMI_AUTO, BuiltInStateModelDefinitions.MasterSlave.name());
setupStateModel();
setupInstances(3);
setupLiveInstances(3);
@@ -77,10 +78,194 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
testP2PMessage(null, false);
}
- private void testP2PMessage(ClusterConfig clusterConfig, Boolean p2pMessageEnabled) throws Exception {
- Map<String, Resource> resourceMap =
- getResourceMap(new String[]{db}, numPartition, BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig,
- null);
+ @Test
+ public void testAvoidDuplicatedMessageWithP2PEnabled() throws Exception {
+ preSetup();
+ ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
+ clusterConfig.enableP2PMessage(true);
+ setClusterConfig(clusterConfig);
+
+ Map<String, Resource> resourceMap = getResourceMap(new String[] { db }, numPartition,
+ BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig, null);
+
+ ClusterDataCache cache = new ClusterDataCache();
+ cache.setAsyncTasksThreadPool(Executors.newSingleThreadExecutor());
+ event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
+ event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput());
+ event.addAttribute(AttributeName.helixmanager.name(), manager);
+
+ Pipeline pipeline = createPipeline();
+ pipeline.handle(event);
+
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+
+ CurrentStateOutput currentStateOutput =
+ populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ Partition p = new Partition(db + "_0");
+
+ String masterInstance = getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+ MasterSlaveSMD.States.MASTER.name());
+ Assert.assertNotNull(masterInstance);
+
+ admin.enableInstance(_clusterName, masterInstance, false);
+ cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+ cache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
+
+ pipeline = createPipeline();
+ pipeline.handle(event);
+
+ bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+
+ MessageSelectionStageOutput messageOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ List<Message> messages = messageOutput.getMessages(db, p);
+
+ Assert.assertEquals(messages.size(), 1);
+ Message toSlaveMessage = messages.get(0);
+ Assert.assertEquals(toSlaveMessage.getTgtName(), masterInstance);
+ Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
+ Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
+
+ // verify p2p message sent to the old master instance
+ Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1);
+ String newMasterInstance =
+ getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+ MasterSlaveSMD.States.MASTER.name());
+
+ Message relayMessage = toSlaveMessage.getRelayMessage(newMasterInstance);
+ Assert.assertNotNull(relayMessage);
+ Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
+ Assert.assertEquals(relayMessage.getTgtName(), newMasterInstance);
+ Assert.assertEquals(relayMessage.getRelaySrcHost(), masterInstance);
+ Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+ Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
+
+
+ // test the old master finish state transition, but has not forward p2p message yet.
+ currentStateOutput.setCurrentState(db, p, masterInstance, "SLAVE");
+ currentStateOutput.setPendingMessage(db, p, masterInstance, toSlaveMessage);
+ currentStateOutput.setPendingRelayMessage(db, p, masterInstance, relayMessage);
+
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ pipeline.handle(event);
+
+ messageOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ messages = messageOutput.getMessages(db, p);
+ Assert.assertEquals(messages.size(), 0);
+
+
+ currentStateOutput =
+ populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+ currentStateOutput.setCurrentState(db, p, masterInstance, "SLAVE");
+ currentStateOutput.setPendingMessage(db, p, newMasterInstance, relayMessage);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ pipeline.handle(event);
+
+ messageOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ messages = messageOutput.getMessages(db, p);
+ Assert.assertEquals(messages.size(), 1);
+
+ Message toOfflineMessage = messages.get(0);
+ Assert.assertEquals(toOfflineMessage.getTgtName(), masterInstance);
+ Assert.assertEquals(toOfflineMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+ Assert.assertEquals(toOfflineMessage.getToState(), MasterSlaveSMD.States.OFFLINE.name());
+
+
+ // Now, the old master finish state transition, but has not forward p2p message yet.
+ // Then the preference list has changed, so now the new master is different from previously calculated new master
+ // but controller should not send S->M to newly calculated master.
+ currentStateOutput.setCurrentState(db, p, masterInstance, "OFFLINE");
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ String slaveInstance =
+ getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+ MasterSlaveSMD.States.SLAVE.name());
+
+ Map<String, String> instanceStateMap = bestPossibleStateOutput.getInstanceStateMap(db, p);
+ instanceStateMap.put(newMasterInstance, "SLAVE");
+ instanceStateMap.put(slaveInstance, "MASTER");
+ bestPossibleStateOutput.setState(db, p, instanceStateMap);
+
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+
+ pipeline = new Pipeline("test");
+ pipeline.addStage(new IntermediateStateCalcStage());
+ pipeline.addStage(new MessageGenerationPhase());
+ pipeline.addStage(new MessageSelectionStage());
+ pipeline.addStage(new MessageThrottleStage());
+
+ pipeline.handle(event);
+
+ messageOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ messages = messageOutput.getMessages(db, p);
+ Assert.assertEquals(messages.size(), 0);
+
+
+ // Now, the old master has forwarded the p2p master to previously calculated master,
+ // So the state-transition still happened in previously calculated master.
+ // Controller will not send S->M to new master.
+ currentStateOutput.setPendingMessage(db, p, newMasterInstance, relayMessage);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+ event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), bestPossibleStateOutput);
+
+
+ pipeline = new Pipeline("test");
+ pipeline.addStage(new IntermediateStateCalcStage());
+ pipeline.addStage(new MessageGenerationPhase());
+ pipeline.addStage(new MessageSelectionStage());
+ pipeline.addStage(new MessageThrottleStage());
+
+ pipeline.handle(event);
+
+ messageOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ messages = messageOutput.getMessages(db, p);
+ Assert.assertEquals(messages.size(), 0);
+
+
+ // now, the previous calculated master completed the state transition and deleted the p2p message.
+ // Controller should drop this master first.
+ currentStateOutput =
+ populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+ currentStateOutput.setCurrentState(db, p, newMasterInstance, "MASTER");
+ currentStateOutput.setCurrentState(db, p, slaveInstance, "SLAVE");
+
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ pipeline = new Pipeline("test");
+ pipeline.addStage(new MessageGenerationPhase());
+ pipeline.addStage(new MessageSelectionStage());
+ pipeline.addStage(new MessageThrottleStage());
+
+ pipeline.handle(event);
+
+ messageOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ messages = messageOutput.getMessages(db, p);
+ Assert.assertEquals(messages.size(), 1);
+
+ toSlaveMessage = messages.get(0);
+ Assert.assertEquals(toSlaveMessage.getTgtName(), newMasterInstance);
+ Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
+ Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
+ }
+
+ private void testP2PMessage(ClusterConfig clusterConfig, Boolean p2pMessageEnabled)
+ throws Exception {
+ Map<String, Resource> resourceMap = getResourceMap(new String[] { db }, numPartition,
+ BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig, null);
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
@@ -90,26 +275,30 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
Pipeline pipeline = createPipeline();
pipeline.handle(event);
- BestPossibleStateOutput bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
- CurrentStateOutput currentStateOutput = populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+ CurrentStateOutput currentStateOutput =
+ populateCurrentStateFromBestPossible(bestPossibleStateOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
Partition p = new Partition(db + "_0");
- String masterInstance =
- getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p), MasterSlaveSMD.States.MASTER.name());
+ String masterInstance = getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+ MasterSlaveSMD.States.MASTER.name());
Assert.assertNotNull(masterInstance);
admin.enableInstance(_clusterName, masterInstance, false);
ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
cache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
+ pipeline = createPipeline();
pipeline.handle(event);
bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
- MessageSelectionStageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ MessageSelectionStageOutput messageOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
List<Message> messages = messageOutput.getMessages(db, p);
Assert.assertEquals(messages.size(), 1);
@@ -121,7 +310,8 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
if (p2pMessageEnabled) {
Assert.assertEquals(message.getRelayMessages().entrySet().size(), 1);
String newMasterInstance =
- getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p), MasterSlaveSMD.States.MASTER.name());
+ getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+ MasterSlaveSMD.States.MASTER.name());
Message relayMessage = message.getRelayMessage(newMasterInstance);
Assert.assertNotNull(relayMessage);
@@ -146,7 +336,8 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
return masterInstance;
}
- private CurrentStateOutput populateCurrentStateFromBestPossible(BestPossibleStateOutput bestPossibleStateOutput) {
+ private CurrentStateOutput populateCurrentStateFromBestPossible(
+ BestPossibleStateOutput bestPossibleStateOutput) {
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
for (String resource : bestPossibleStateOutput.getResourceStatesMap().keySet()) {
PartitionStateMap partitionStateMap = bestPossibleStateOutput.getPartitionStateMap(resource);
[2/3] helix git commit: Fix P2P message logic in controller to avoid
sending duplicated messages to participants.
Posted by lx...@apache.org.
Fix P2P message logic in controller to avoid sending duplicated messages to participants.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/880f8851
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/880f8851
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/880f8851
Branch: refs/heads/master
Commit: 880f885121afecab4e186282fbf94a146a2cf04a
Parents: f1c5037
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Apr 24 18:18:40 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 17 15:08:28 2018 -0700
----------------------------------------------------------------------
.../common/caches/InstanceMessagesCache.java | 146 +++++++--
.../controller/stages/ClusterDataCache.java | 12 +-
.../stages/CurrentStateComputationStage.java | 57 +++-
.../controller/stages/CurrentStateOutput.java | 58 ++--
.../stages/MessageGenerationPhase.java | 17 +-
.../stages/MessageSelectionStage.java | 31 +-
.../controller/stages/TaskAssignmentStage.java | 7 +-
.../messaging/handling/HelixTaskExecutor.java | 22 +-
.../java/org/apache/helix/model/Message.java | 12 +-
.../helix/task/AbstractTaskDispatcher.java | 2 +-
.../helix/task/DeprecatedTaskRebalancer.java | 2 +-
.../FixedTargetTaskAssignmentCalculator.java | 4 +-
.../org/apache/helix/task/JobRebalancer.java | 5 +-
.../ZkHelixClusterVerifier.java | 2 +-
.../org/apache/helix/common/ZkTestBase.java | 34 ++
.../TestCurrentStateComputationStage.java | 2 +-
.../stages/TestMsgSelectionStage.java | 5 +-
.../messaging/TestP2PMessageSemiAuto.java | 68 ++--
.../messaging/TestP2PNoDuplicatedMessage.java | 315 +++++++++++++++++++
.../PartitionMigration/TestExpandCluster.java | 2 -
.../handling/MockHelixTaskExecutor.java | 111 +++++++
.../TestP2PMessagesAvoidDuplicatedMessage.java | 13 +-
.../TestP2PStateTransitionMessages.java | 217 ++++++++++++-
23 files changed, 990 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index f8001da..13b77cc 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -22,7 +22,7 @@ package org.apache.helix.common.caches;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -43,9 +43,16 @@ import org.slf4j.LoggerFactory;
public class InstanceMessagesCache {
private static final Logger LOG = LoggerFactory.getLogger(InstanceMessagesCache.class.getName());
private Map<String, Map<String, Message>> _messageMap;
+ private Map<String, Map<String, Message>> _relayMessageMap;
// maintain a cache of participant messages across pipeline runs
+ // <instance -> {<MessageId, Message>}>
private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
+
+ // maintain a set of valid pending P2P messages.
+ // <instance -> {<MessageId, Message>}>
+ private Map<String, Map<String, Message>> _relayMessageCache = Maps.newHashMap();
+
private String _clusterName;
public InstanceMessagesCache(String clusterName) {
@@ -54,17 +61,15 @@ public class InstanceMessagesCache {
/**
* This refreshes all pending messages in the cluster by re-fetching the data from zookeeper in an
- * efficient way
- * current state must be refreshed before refreshing relay messages because we need to use current
- * state to validate all relay messages.
+ * efficient way current state must be refreshed before refreshing relay messages because we need
+ * to use current state to validate all relay messages.
*
* @param accessor
* @param liveInstanceMap
*
* @return
*/
- public boolean refresh(HelixDataAccessor accessor,
- Map<String, LiveInstance> liveInstanceMap) {
+ public boolean refresh(HelixDataAccessor accessor, Map<String, LiveInstance> liveInstanceMap) {
LOG.info("START: InstanceMessagesCache.refresh()");
long startTime = System.currentTimeMillis();
@@ -124,13 +129,16 @@ public class InstanceMessagesCache {
System.currentTimeMillis() - startTime) + " ms.");
}
+ LOG.info("END: InstanceMessagesCache.refresh()");
+
return true;
}
// update all valid relay messages attached to existing state transition messages into message map.
public void updateRelayMessages(Map<String, LiveInstance> liveInstanceMap,
Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
- List<Message> relayMessages = new ArrayList<>();
+
+ // refresh _relayMessageCache
for (String instance : _messageMap.keySet()) {
Map<String, Message> instanceMessages = _messageMap.get(instance);
Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
@@ -170,51 +178,131 @@ public class InstanceMessagesCache {
for (Message relayMsg : message.getRelayMessages().values()) {
relayMsg.setRelayTime(transitionCompleteTime);
- relayMessages.add(relayMsg);
+ cacheRelayMessage(relayMsg);
}
}
}
}
- for (Message message : relayMessages) {
- String instance = message.getTgtName();
- Map<String, Message> instanceMessages = _messageMap.get(instance);
- if (instanceMessages == null) {
- instanceMessages = new HashMap<>();
- _messageMap.put(instance, instanceMessages);
+ Map<String, Map<String, Message>> relayMessageMap = new HashMap<>();
+ // refresh _relayMessageMap
+ for (String instance : _relayMessageCache.keySet()) {
+ Map<String, Message> messages = _relayMessageCache.get(instance);
+ Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
+ currentStateMap.get(instance);
+ if (instanceCurrentStateMap == null) {
+ continue;
+ }
+
+ Iterator<Map.Entry<String, Message>> iterator = messages.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Message message = iterator.next().getValue();
+ String sessionId = message.getTgtSessionId();
+ String resourceName = message.getResourceName();
+ String partitionName = message.getPartitionName();
+ String targetState = message.getToState();
+ String instanceSessionId = liveInstanceMap.get(instance).getSessionId();
+
+ if (_messageMap.get(instance).containsKey(message.getMsgId())) {
+ // relay message has already been sent to target host
+ // remove the message from relayMessageCache.
+ LOG.info(
+ "Relay message has already been sent to target host, remove relay message from the cache"
+ + message.getId());
+ iterator.remove();
+ continue;
+ }
+
+ if (!instanceSessionId.equals(sessionId)) {
+ LOG.info(
+ "Instance SessionId does not match, remove relay message from the cache" + message
+ .getId());
+ iterator.remove();
+ continue;
+ }
+
+ Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId);
+ if (sessionCurrentStateMap == null) {
+ LOG.info("No sessionCurrentStateMap found, ignore relay message from the cache" + message
+ .getId());
+ continue;
+ }
+ CurrentState currentState = sessionCurrentStateMap.get(resourceName);
+ if (currentState != null && targetState.equals(currentState.getState(partitionName))) {
+ LOG.info("CurrentState " + currentState
+ + " match the target state of the relay message, remove relay from cache." + message
+ .getId());
+ iterator.remove();
+ continue;
+ }
+
+ if (message.isExpired()) {
+ LOG.info("relay message " + message.getId() + " expired, remove it from cache."
+ + message.getId());
+ iterator.remove();
+ continue;
+ }
+
+ if (!relayMessageMap.containsKey(instance)) {
+ relayMessageMap.put(instance, Maps.<String, Message>newHashMap());
+ }
+ relayMessageMap.get(instance).put(message.getMsgId(), message);
}
- instanceMessages.put(message.getId(), message);
}
+
+ _relayMessageMap = Collections.unmodifiableMap(relayMessageMap);
}
/**
- * Provides a list of current outstanding transitions on a given instance.
+ * Provides a list of current outstanding pending state transition messages on a given instance.
*
* @param instanceName
*
* @return
*/
public Map<String, Message> getMessages(String instanceName) {
- Map<String, Message> map = _messageMap.get(instanceName);
- if (map != null) {
- return map;
- } else {
- return Collections.emptyMap();
+ if (_messageMap.containsKey(instanceName)) {
+ return _messageMap.get(instanceName);
}
+ return Collections.emptyMap();
}
- public void cacheMessages(List<Message> messages) {
+ /**
+ * Provides a list of current outstanding pending relay (p2p) messages on a given instance.
+ *
+ * @param instanceName
+ *
+ * @return
+ */
+ public Map<String, Message> getRelayMessages(String instanceName) {
+ if (_relayMessageMap.containsKey(instanceName)) {
+ return _relayMessageMap.get(instanceName);
+ }
+ return Collections.emptyMap();
+ }
+
+ public void cacheMessages(Collection<Message> messages) {
for (Message message : messages) {
String instanceName = message.getTgtName();
- Map<String, Message> instMsgMap;
- if (_messageCache.containsKey(instanceName)) {
- instMsgMap = _messageCache.get(instanceName);
- } else {
- instMsgMap = Maps.newHashMap();
- _messageCache.put(instanceName, instMsgMap);
+ if (!_messageCache.containsKey(instanceName)) {
+ _messageCache.put(instanceName, Maps.<String, Message>newHashMap());
}
- instMsgMap.put(message.getId(), message);
+ _messageCache.get(instanceName).put(message.getId(), message);
+
+ if (message.hasRelayMessages()) {
+ for (Message relayMsg : message.getRelayMessages().values()) {
+ cacheRelayMessage(relayMsg);
+ }
+ }
+ }
+ }
+
+ protected void cacheRelayMessage(Message message) {
+ String instanceName = message.getTgtName();
+ if (!_relayMessageCache.containsKey(instanceName)) {
+ _relayMessageCache.put(instanceName, Maps.<String, Message>newHashMap());
}
+ _relayMessageCache.get(instanceName).put(message.getId(), message);
}
@Override public String toString() {
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 3e6bd86..577b2c7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
*/
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -476,7 +477,16 @@ public class ClusterDataCache {
return _instanceMessagesCache.getMessages(instanceName);
}
- public void cacheMessages(List<Message> messages) {
+ /**
+ * Provides a list of current outstanding pending relay messages on a given instance.
+ * @param instanceName
+ * @return
+ */
+ public Map<String, Message> getRelayMessages(String instanceName) {
+ return _instanceMessagesCache.getRelayMessages(instanceName);
+ }
+
+ public void cacheMessages(Collection<Message> messages) {
_instanceMessagesCache.cacheMessages(messages);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index a56d194..9cc9506 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -66,10 +66,12 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
// update pending messages
Map<String, Message> messages = cache.getMessages(instanceName);
- updatePendingMessages(instance, messages.values(), currentStateOutput, resourceMap);
+ Map<String, Message> relayMessages = cache.getRelayMessages(instanceName);
+ updatePendingMessages(instance, messages.values(), currentStateOutput, relayMessages.values(), resourceMap);
// update current states.
- Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName, instanceSessionId);
+ Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName,
+ instanceSessionId);
updateCurrentStates(instance, currentStateMap.values(), currentStateOutput, resourceMap);
}
@@ -84,7 +86,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
// update all pending messages to CurrentStateOutput.
private void updatePendingMessages(LiveInstance instance, Collection<Message> pendingMessages,
- CurrentStateOutput currentStateOutput, Map<String, Resource> resourceMap) {
+ CurrentStateOutput currentStateOutput, Collection<Message> pendingRelayMessages,
+ Map<String, Resource> resourceMap) {
String instanceName = instance.getInstanceName();
String instanceSessionId = instance.getSessionId();
@@ -100,6 +103,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
String resourceName = message.getResourceName();
Resource resource = resourceMap.get(resourceName);
if (resource == null) {
+ LogUtil.logInfo(LOG, _eventId, String.format(
+ "Ignore a pending relay message %s for a non-exist resource %s and partition %s",
+ message.getMsgId(), resourceName, message.getPartitionName()));
continue;
}
@@ -109,7 +115,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
if (partition != null) {
setMessageState(currentStateOutput, resourceName, partition, instanceName, message);
} else {
- // log
+ LogUtil.logInfo(LOG, _eventId, String
+ .format("Ignore a pending message %s for a non-exist resource %s and partition %s",
+ message.getMsgId(), resourceName, message.getPartitionName()));
}
} else {
List<String> partitionNames = message.getPartitionNames();
@@ -119,12 +127,47 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
if (partition != null) {
setMessageState(currentStateOutput, resourceName, partition, instanceName, message);
} else {
- // log
+ LogUtil.logInfo(LOG, _eventId, String.format(
+ "Ignore a pending message %s for a non-exist resource %s and partition %s",
+ message.getMsgId(), resourceName, message.getPartitionName()));
}
}
}
}
}
+
+
+ // update all pending relay messages
+ for (Message message : pendingRelayMessages) {
+ if (!message.isRelayMessage()) {
+ LogUtil.logWarn(LOG, _eventId,
+ String.format("Not a relay message %s, ignored!", message.getMsgId()));
+ continue;
+ }
+ String resourceName = message.getResourceName();
+ Resource resource = resourceMap.get(resourceName);
+ if (resource == null) {
+ LogUtil.logInfo(LOG, _eventId, String.format(
+ "Ignore a pending relay message %s for a non-exist resource %s and partition %s",
+ message.getMsgId(), resourceName, message.getPartitionName()));
+ continue;
+ }
+
+ if (!message.getBatchMessageMode()) {
+ String partitionName = message.getPartitionName();
+ Partition partition = resource.getPartition(partitionName);
+ if (partition != null) {
+ currentStateOutput.setPendingRelayMessage(resourceName, partition, instanceName, message);
+ } else {
+ LogUtil.logInfo(LOG, _eventId, String.format(
+ "Ignore a pending relay message %s for a non-exist resource %s and partition %s",
+ message.getMsgId(), resourceName, message.getPartitionName()));
+ }
+ } else {
+ LogUtil.logWarn(LOG, _eventId, String
+ .format("A relay message %s should not be batched, ignored!", message.getMsgId()));
+ }
+ }
}
// update current states in CurrentStateOutput
@@ -169,9 +212,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
private void setMessageState(CurrentStateOutput currentStateOutput, String resourceName,
Partition partition, String instanceName, Message message) {
if (MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())) {
- currentStateOutput.setPendingState(resourceName, partition, instanceName, message);
+ currentStateOutput.setPendingMessage(resourceName, partition, instanceName, message);
} else {
- currentStateOutput.setCancellationState(resourceName, partition, instanceName, message);
+ currentStateOutput.setCancellationMessage(resourceName, partition, instanceName, message);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 4ebef97..b634703 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -37,8 +37,9 @@ import com.google.common.collect.Sets;
*/
public class CurrentStateOutput {
private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap;
- private final Map<String, Map<Partition, Map<String, Message>>> _pendingStateMap;
- private final Map<String, Map<Partition, Map<String, Message>>> _cancellationStateMap;
+ private final Map<String, Map<Partition, Map<String, Message>>> _pendingMessageMap;
+ private final Map<String, Map<Partition, Map<String, Message>>> _cancellationMessageMap;
+ private final Map<String, Map<Partition, Map<String, Message>>> _pendingRelayMessageMap;
// resourceName -> (Partition -> (instanceName -> endTime))
// Note that startTime / endTime in CurrentState marks that of state transition
@@ -61,8 +62,9 @@ public class CurrentStateOutput {
public CurrentStateOutput() {
_currentStateMap = new HashMap<>();
- _pendingStateMap = new HashMap<>();
- _cancellationStateMap = new HashMap<>();
+ _pendingMessageMap = new HashMap<>();
+ _pendingRelayMessageMap = new HashMap<>();
+ _cancellationMessageMap = new HashMap<>();
_currentStateEndTimeMap = new HashMap<>();
_resourceStateModelMap = new HashMap<>();
_curStateMetaMap = new HashMap<>();
@@ -140,9 +142,9 @@ public class CurrentStateOutput {
_infoMap.get(resourceName).get(partition).put(instanceName, state);
}
- public void setPendingState(String resourceName, Partition partition, String instanceName,
+ public void setPendingMessage(String resourceName, Partition partition, String instanceName,
Message message) {
- setStateMessage(resourceName, partition, instanceName, message, _pendingStateMap);
+ setStateMessage(resourceName, partition, instanceName, message, _pendingMessageMap);
}
/**
@@ -152,9 +154,14 @@ public class CurrentStateOutput {
* @param instanceName
* @param message
*/
- public void setCancellationState(String resourceName, Partition partition, String instanceName,
+ public void setCancellationMessage(String resourceName, Partition partition, String instanceName,
Message message) {
- setStateMessage(resourceName, partition, instanceName, message, _cancellationStateMap);
+ setStateMessage(resourceName, partition, instanceName, message, _cancellationMessageMap);
+ }
+
+ public void setPendingRelayMessage(String resourceName, Partition partition, String instanceName,
+ Message message) {
+ setStateMessage(resourceName, partition, instanceName, message, _pendingRelayMessageMap);
}
private void setStateMessage(String resourceName, Partition partition, String instanceName,
@@ -221,15 +228,24 @@ public class CurrentStateOutput {
}
/**
- * given (resource, partition, instance), returns toState
+ * given (resource, partition, instance), returns pending message on this instance.
* @param resourceName
* @param partition
* @param instanceName
* @return pending message
*/
- // TODO: this should return toState, not pending message, create a separate method
- public Message getPendingState(String resourceName, Partition partition, String instanceName) {
- return getStateMessage(resourceName, partition, instanceName, _pendingStateMap);
+ public Message getPendingMessage(String resourceName, Partition partition, String instanceName) {
+ return getStateMessage(resourceName, partition, instanceName, _pendingMessageMap);
+ }
+
+ public Map<String, Message> getPendingRelayMessageMap(String resourceName, Partition partition) {
+ if (_pendingRelayMessageMap.containsKey(resourceName)) {
+ Map<Partition, Map<String, Message>> map = _pendingRelayMessageMap.get(resourceName);
+ if (map.containsKey(partition)) {
+ return map.get(partition);
+ }
+ }
+ return Collections.emptyMap();
}
/**
@@ -239,9 +255,9 @@ public class CurrentStateOutput {
* @param instanceName
* @return
*/
- public Message getCancellationState(String resourceName, Partition partition,
+ public Message getCancellationMessage(String resourceName, Partition partition,
String instanceName) {
- return getStateMessage(resourceName, partition, instanceName, _cancellationStateMap);
+ return getStateMessage(resourceName, partition, instanceName, _cancellationMessageMap);
}
private Message getStateMessage(String resourceName, Partition partition, String instanceName,
@@ -291,8 +307,8 @@ public class CurrentStateOutput {
* @return pending target state map
*/
public Map<String, String> getPendingStateMap(String resourceName, Partition partition) {
- if (_pendingStateMap.containsKey(resourceName)) {
- Map<Partition, Map<String, Message>> map = _pendingStateMap.get(resourceName);
+ if (_pendingMessageMap.containsKey(resourceName)) {
+ Map<Partition, Map<String, Message>> map = _pendingMessageMap.get(resourceName);
if (map.containsKey(partition)) {
Map<String, Message> pendingMsgMap = map.get(partition);
Map<String, String> pendingStateMap = new HashMap<String, String>();
@@ -312,8 +328,8 @@ public class CurrentStateOutput {
* @return pending messages map
*/
public Map<String, Message> getPendingMessageMap(String resourceName, Partition partition) {
- if (_pendingStateMap.containsKey(resourceName)) {
- Map<Partition, Map<String, Message>> map = _pendingStateMap.get(resourceName);
+ if (_pendingMessageMap.containsKey(resourceName)) {
+ Map<Partition, Map<String, Message>> map = _pendingMessageMap.get(resourceName);
if (map.containsKey(partition)) {
return map.get(partition);
}
@@ -328,7 +344,7 @@ public class CurrentStateOutput {
*/
public Set<Partition> getCurrentStateMappedPartitions(String resourceId) {
Map<Partition, Map<String, String>> currentStateMap = _currentStateMap.get(resourceId);
- Map<Partition, Map<String, Message>> pendingStateMap = _pendingStateMap.get(resourceId);
+ Map<Partition, Map<String, Message>> pendingStateMap = _pendingMessageMap.get(resourceId);
Set<Partition> partitionSet = Sets.newHashSet();
if (currentStateMap != null) {
partitionSet.addAll(currentStateMap.keySet());
@@ -346,7 +362,7 @@ public class CurrentStateOutput {
* @return set of participants to partitions mapping
*/
public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) {
- return getPartitionCountWithState(resourceStateModel, state, (Map) _pendingStateMap);
+ return getPartitionCountWithState(resourceStateModel, state, (Map) _pendingMessageMap);
}
/**
@@ -397,7 +413,7 @@ public class CurrentStateOutput {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("current state= ").append(_currentStateMap);
- sb.append(", pending state= ").append(_pendingStateMap);
+ sb.append(", pending state= ").append(_pendingMessageMap);
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index e21c607..3abc965 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -82,7 +82,7 @@ public class MessageGenerationPhase extends AbstractBaseStage {
}
Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- Map<String, String> sessionIdMap = new HashMap<String, String>();
+ Map<String, String> sessionIdMap = new HashMap<>();
for (LiveInstance liveInstance : liveInstances.values()) {
sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
@@ -101,7 +101,7 @@ public class MessageGenerationPhase extends AbstractBaseStage {
for (Partition partition : resource.getPartitions()) {
- Map<String, String> instanceStateMap = new HashMap<String, String>(
+ Map<String, String> instanceStateMap = new HashMap<>(
intermediateStateOutput.getInstanceStateMap(resourceName, partition));
Map<String, String> pendingStateMap =
currentStateOutput.getPendingStateMap(resourceName, partition);
@@ -120,24 +120,27 @@ public class MessageGenerationPhase extends AbstractBaseStage {
// we should generate message based on the desired-state priority
// so keep generated messages in a temp map keyed by state
// desired-state->list of generated-messages
- Map<String, List<Message>> messageMap = new HashMap<String, List<Message>>();
+ Map<String, List<Message>> messageMap = new HashMap<>();
for (String instanceName : instanceStateMap.keySet()) {
String desiredState = instanceStateMap.get(instanceName);
- String currentState = currentStateOutput.getCurrentState(resourceName, partition, instanceName);
+ String currentState = currentStateOutput.getCurrentState(resourceName, partition,
+ instanceName);
if (currentState == null) {
currentState = stateModelDef.getInitialState();
}
- Message pendingMessage = currentStateOutput.getPendingState(resourceName, partition, instanceName);
+ Message pendingMessage = currentStateOutput.getPendingMessage(resourceName, partition,
+ instanceName);
boolean isCancellationEnabled = cache.getClusterConfig().isStateTransitionCancelEnabled();
- Message cancellationMessage = currentStateOutput.getCancellationState(resourceName, partition, instanceName);
+ Message cancellationMessage = currentStateOutput.getCancellationMessage(resourceName,
+ partition, instanceName);
String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
Message message = null;
- if (shouldCleanUpPendingMessage(pendingMessage, currentState,
+ if (pendingMessage != null && shouldCleanUpPendingMessage(pendingMessage, currentState,
currentStateOutput.getEndTime(resourceName, partition, instanceName))) {
LogUtil.logInfo(logger, _eventId, String.format(
"Adding pending message %s on instance %s to clean up. Msg: %s->%s, current state of resource %s:%s is %s",
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index a061598..03838f4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -20,11 +20,13 @@ package org.apache.helix.controller.stages;
*/
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.controller.LogUtil;
@@ -60,6 +62,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+
Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
CurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.name());
@@ -86,6 +89,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
List<Message> selectedMessages = selectMessages(cache.getLiveInstances(),
currentStateOutput.getCurrentStateMap(resourceName, partition),
currentStateOutput.getPendingMessageMap(resourceName, partition), messages,
+ currentStateOutput.getPendingRelayMessageMap(resourceName, partition).values(),
stateConstraints, stateTransitionPriorities, stateModelDef,
resource.isP2PMessageEnabled());
output.addMessages(resourceName, partition, selectedMessages);
@@ -127,9 +131,9 @@ public class MessageSelectionStage extends AbstractBaseStage {
*/
List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
Map<String, String> currentStates, Map<String, Message> pendingMessages,
- List<Message> messages, Map<String, Bounds> stateConstraints,
- final Map<String, Integer> stateTransitionPriorities, StateModelDefinition stateModelDef,
- boolean p2pMessageEnabled) {
+ List<Message> messages, Collection<Message> pendingRelayMessages,
+ Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
+ StateModelDefinition stateModelDef, boolean p2pMessageEnabled) {
if (messages == null || messages.isEmpty()) {
return Collections.emptyList();
}
@@ -188,6 +192,27 @@ public class MessageSelectionStage extends AbstractBaseStage {
for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
for (Message message : messageList) {
String toState = message.getToState();
+ String fromState = message.getFromState();
+
+ if (toState.equals(stateModelDef.getTopState())) {
+ // find if there are any pending relay messages match this message.
+ // if yes, rebuild the message to use the same message id from the original relay message.
+ for (Message relayMsg : pendingRelayMessages) {
+ if (relayMsg.getToState().equals(toState) && relayMsg.getFromState()
+ .equals(fromState)) {
+ if (relayMsg.getTgtName().equals(message.getTgtName())) {
+ message = new Message(message, relayMsg.getMsgId());
+ } else {
+ // if there are pending relay message that was sent to a different host than the current message
+ // we should not send the toState message now.
+ LOG.info(
+ "There is pending relay message to a different host, not send message: {}, pending relay message: {}",
+ message, relayMsg);
+ continue;
+ }
+ }
+ }
+ }
if (stateConstraints.containsKey(toState)) {
int newCnt = (stateCnts.containsKey(toState) ? stateCnts.get(toState) + 1 : 1);
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index c196a26..6036f34 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -160,6 +160,11 @@ public class TaskAssignmentStage extends AbstractBaseStage {
keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
}
- dataAccessor.createChildren(keys, new ArrayList<>(messages));
+ boolean[] results = dataAccessor.createChildren(keys, new ArrayList<>(messages));
+ for (int i = 0; i < results.length; i++) {
+ if (!results[i]) {
+ LogUtil.logWarn(logger, _eventId, "Failed to send message: " + keys.get(i));
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 9734cc8..925f127 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -488,8 +488,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
return true;
} else {
_statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
- "fail to cancel task: " + taskId,
- notificationContext.getManager());
+ "fail to cancel task: " + taskId, notificationContext.getManager());
}
} else {
_statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
@@ -789,14 +788,6 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
continue;
}
- if (message.isExpired()) {
- LOG.info(
- "Dropping expired message. mid: " + message.getId() + ", from: " + message.getMsgSrc() + " relayed from: "
- + message.getRelaySrcHost());
- reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
- continue;
- }
-
String tgtSessionId = message.getTgtSessionId();
// sessionId mismatch normally means message comes from expired session, just remove it
if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
@@ -843,6 +834,14 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
continue;
}
+ if (message.isExpired()) {
+ LOG.info(
+ "Dropping expired message. mid: " + message.getId() + ", from: " + message.getMsgSrc()
+ + " relayed from: " + message.getRelaySrcHost());
+ reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
+ continue;
+ }
+
// State Transition Cancellation
if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
boolean success = cancelNotStartedStateTransition(message, stateTransitionHandlers, accessor, instanceName);
@@ -874,6 +873,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
duplicatedMessage.getToState(), message.getFromState(), message.getToState()));
} else if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
&& isStateTransitionInProgress(messageTarget)) {
+
// If there is another state transition for same partition is going on,
// discard the message. Controller will resend if this is a valid message
throw new HelixException(String
@@ -1095,7 +1095,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
&& stateTranstionMessage.getToState().equalsIgnoreCase(cancellationMessage.getToState());
}
- private String getMessageTarget(String resourceName, String partitionName) {
+ String getMessageTarget(String resourceName, String partitionName) {
return String.format("%s_%s", resourceName, partitionName);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 8195092..51d03cb 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -178,6 +178,16 @@ public class Message extends HelixProperty {
}
/**
+ * Instantiate a message with a new id
+ * @param message message to be copied
+ * @param id unique message identifier
+ */
+ public Message(Message message, String id) {
+ super(new ZNRecord(message.getRecord(), id));
+ setMsgId(id);
+ }
+
+ /**
* Set a subtype of the message
* @param subType name of the subtype
*/
@@ -820,7 +830,7 @@ public class Message extends HelixProperty {
// use relay time if this is a relay message
if (isRelayMessage()) {
long relayTime = getRelayTime();
- return relayTime <= 0 || (relayTime + expiry < current);
+ return (relayTime > 0 && (relayTime + expiry < current));
}
return getCreateTimeStamp() + expiry < current;
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index eb67d59..e230fb5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -65,7 +65,7 @@ public abstract class AbstractTaskDispatcher {
// Check for pending state transitions on this (partition, instance).
Message pendingMessage =
- currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
+ currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) {
processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, pId, pName, instance,
pendingMessage, jobState, currState, paMap, assignedPartitions);
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 97e02fc..9903117 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -366,7 +366,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
// Check for pending state transitions on this (partition, instance).
Message pendingMessage =
- currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
+ currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
if (pendingMessage != null) {
// There is a pending state transition for this (partition, instance). Just copy forward
// the state assignment from the previous ideal state.
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 87b57c7..4389484 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -156,7 +156,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
int pId = partitions.get(0);
if (includeSet.contains(pId)) {
for (String instance : instances) {
- Message pendingMessage = currStateOutput.getPendingState(tgtIs.getResourceName(),
+ Message pendingMessage = currStateOutput.getPendingMessage(tgtIs.getResourceName(),
new Partition(pName), instance);
if (pendingMessage != null) {
continue;
@@ -230,7 +230,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
// If there is, we should wait until the pending message gets processed, so skip
// assignment this time around
Message pendingMessage =
- currStateOutput.getPendingState(targetIdealState.getResourceName(),
+ currStateOutput.getPendingMessage(targetIdealState.getResourceName(),
new Partition(targetResourcePartitionName), instance);
if (pendingMessage != null) {
continue;
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index ddda41a..5b29c23 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -270,7 +270,7 @@ public class JobRebalancer extends TaskRebalancer {
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.TASK_ABORTED.name()));
}
Partition partition = new Partition(pName(jobResource, pId));
- Message pendingMessage = currStateOutput.getPendingState(jobResource, partition, instance);
+ Message pendingMessage = currStateOutput.getPendingMessage(jobResource, partition, instance);
// While job is failing, if the task is pending on INIT->RUNNING, set it back to INIT,
// so that Helix will cancel the transition.
if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT && pendingMessage != null) {
@@ -337,7 +337,8 @@ public class JobRebalancer extends TaskRebalancer {
TaskPartitionState state = jobContext.getPartitionState(pId);
Partition partition = new Partition(pName(jobResource, pId));
String instance = jobContext.getAssignedParticipant(pId);
- Message pendingMessage = currentStateOutput.getPendingState(jobResource, partition, instance);
+ Message pendingMessage = currentStateOutput.getPendingMessage(jobResource, partition,
+ instance);
// If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished
if (state == TaskPartitionState.RUNNING
|| (state == TaskPartitionState.INIT && pendingMessage != null)) {
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index 9d8e63d..dbf9272 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit;
public abstract class ZkHelixClusterVerifier
implements IZkChildListener, IZkDataListener, HelixClusterVerifier {
private static Logger LOG = LoggerFactory.getLogger(ZkHelixClusterVerifier.class);
- protected static int DEFAULT_TIMEOUT = 30 * 1000;
+ protected static int DEFAULT_TIMEOUT = 300 * 1000;
protected static int DEFAULT_PERIOD = 100;
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 189e95c..c69744e 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -42,6 +42,7 @@ import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.HelixConfigProperty;
import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.Stage;
@@ -65,6 +66,7 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConfigScopeBuilder;
import org.apache.helix.tools.ClusterSetup;
@@ -240,6 +242,38 @@ public class ZkTestBase {
configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig);
}
+ protected void enableP2PInCluster(String clusterName, ConfigAccessor configAccessor,
+ boolean enable) {
+ // enable p2p message in cluster.
+ if (enable) {
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ clusterConfig.enableP2PMessage(true);
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ } else {
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ clusterConfig.getRecord().getSimpleFields()
+ .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ }
+ }
+
+ protected void enableP2PInResource(String clusterName, ConfigAccessor configAccessor,
+ String dbName, boolean enable) {
+ if (enable) {
+ ResourceConfig resourceConfig =
+ new ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build();
+ configAccessor.setResourceConfig(clusterName, dbName, resourceConfig);
+ } else {
+ // remove P2P Message in resource config
+ ResourceConfig resourceConfig = configAccessor.getResourceConfig(clusterName, dbName);
+ if (resourceConfig != null) {
+ resourceConfig.getRecord().getSimpleFields()
+ .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
+ configAccessor.setResourceConfig(clusterName, dbName, resourceConfig);
+ }
+ }
+ }
+
protected void setDelayTimeInCluster(ZkClient zkClient, String clusterName, long delay) {
ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 4479cf6..06fbf24 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -78,7 +78,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
runStage(event, stage);
CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.name());
String pendingState =
- output2.getPendingState("testResourceName", new Partition("testResourceName_1"),
+ output2.getPendingMessage("testResourceName", new Partition("testResourceName_1"),
"localhost_3").getToState();
AssertJUnit.assertEquals(pendingState, "SLAVE");
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
index 79e4e2c..45e1062 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -85,7 +86,7 @@ public class TestMsgSelectionStage {
List<Message> selectedMsg =
new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingMessages,
- messages, stateConstraints, stateTransitionPriorities,
+ messages, Collections.<Message>emptyList(), stateConstraints, stateTransitionPriorities,
BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(), false);
Assert.assertEquals(selectedMsg.size(), 1);
@@ -123,7 +124,7 @@ public class TestMsgSelectionStage {
List<Message> selectedMsg =
new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingMessages,
- messages, stateConstraints, stateTransitionPriorities,
+ messages, Collections.<Message>emptyList(), stateConstraints, stateTransitionPriorities,
BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(), false);
Assert.assertEquals(selectedMsg.size(), 0);
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
index 551ea78..3c95c2f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.api.config.HelixConfigProperty;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.integration.DelayedTransitionBase;
@@ -33,11 +32,9 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
-import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MasterSlaveSMD;
-import org.apache.helix.model.ResourceConfig;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
@@ -55,7 +52,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
static final String DB_NAME_1 = "TestDB_1";
static final String DB_NAME_2 = "TestDB_2";
- static final int PARTITION_NUMBER = 20;
+ static final int PARTITION_NUMBER = 200;
static final int REPLICA_NUMBER = 3;
List<MockParticipantManager> _participants = new ArrayList<>();
@@ -126,23 +123,25 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
- verifyP2PMessage(DB_NAME_1,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
- verifyP2PMessage(DB_NAME_2,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+ verifyP2PMessage(DB_NAME_1,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1);
+ verifyP2PMessage(DB_NAME_2,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1);
//re-enable the old master
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
- verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
- verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+ verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(),
+ _controller.getInstanceName(), 1);
+ verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(),
+ _controller.getInstanceName(), 1);
}
@Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
public void testP2PStateTransitionEnabledInCluster() {
- enableP2PInCluster(true);
- enableP2PInResource(DB_NAME_1,false);
- enableP2PInResource(DB_NAME_2,false);
+ enableP2PInCluster(CLUSTER_NAME, _configAccessor, true);
+ enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_1,false);
+ enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_2,false);
// disable the master instance
String prevMasterInstance = _instances.get(0);
@@ -156,15 +155,17 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
- verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(1));
- verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(1));
+ verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(
+ 1));
+ verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(
+ 1));
}
@Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
public void testP2PStateTransitionEnabledInResource() {
- enableP2PInCluster(false);
- enableP2PInResource(DB_NAME_1,true);
- enableP2PInResource(DB_NAME_2,false);
+ enableP2PInCluster(CLUSTER_NAME, _configAccessor, false);
+ enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_1,true);
+ enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_2,false);
// disable the master instance
@@ -173,7 +174,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
verifyP2PMessage(DB_NAME_1, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), prevMasterInstance);
- verifyP2PMessage(DB_NAME_2, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+ verifyP2PMessage(DB_NAME_2, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1);
//re-enable the old master
@@ -181,37 +182,14 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(1));
- verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
- }
-
- private void enableP2PInCluster(boolean enable) {
- // enable p2p message in cluster.
- if (enable) {
- ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
- clusterConfig.enableP2PMessage(true);
- _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
- } else {
- ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
- clusterConfig.getRecord().getSimpleFields().remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
- _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
- }
+ verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1);
}
- private void enableP2PInResource(String dbName, boolean enable) {
- if (enable) {
- ResourceConfig resourceConfig = new ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build();
- _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig);
- } else {
- // remove P2P Message in resource config
- ResourceConfig resourceConfig = _configAccessor.getResourceConfig(CLUSTER_NAME, dbName);
- if (resourceConfig != null) {
- resourceConfig.getRecord().getSimpleFields().remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
- _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig);
- }
- }
+ private void verifyP2PMessage(String dbName, String instance, String expectedState, String expectedTriggerHost) {
+ verifyP2PMessage(dbName, instance, expectedState, expectedTriggerHost, 0.7);
}
- private void verifyP2PMessage(String dbName, String instance, String expectedState, String expectedTriggerHost) {
+ private void verifyP2PMessage(String dbName, String instance, String expectedState, String expectedTriggerHost, double expectedRatio) {
ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME);
dataCache.refresh(_accessor);
@@ -239,7 +217,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
}
double ratio = ((double) expectedHost) / ((double) total);
- Assert.assertTrue(ratio >= 0.7, String
+ Assert.assertTrue(ratio >= expectedRatio, String
.format("Only %d out of %d percent transitions to Master were triggered by expected host!",
expectedHost, total));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
new file mode 100644
index 0000000..b3ef3e5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -0,0 +1,315 @@
+package org.apache.helix.integration.messaging;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.DelayedTransitionBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MockHelixTaskExecutor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestP2PNoDuplicatedMessage extends ZkTestBase {
+ private static Logger logger = LoggerFactory.getLogger(TestP2PNoDuplicatedMessage.class);
+
+ final String CLASS_NAME = getShortClassName();
+ final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+ static final int PARTICIPANT_NUMBER = 6;
+ static final int PARTICIPANT_START_PORT = 12918;
+
+ static final int DB_COUNT = 2;
+
+ static final int PARTITION_NUMBER = 50;
+ static final int REPLICA_NUMBER = 3;
+
+ final String _controllerName = CONTROLLER_PREFIX + "_0";
+
+ List<MockParticipantManager> _participants = new ArrayList<>();
+ List<String> _instances = new ArrayList<>();
+ ClusterControllerManager _controller;
+
+ ZkHelixClusterVerifier _clusterVerifier;
+ ConfigAccessor _configAccessor;
+ HelixDataAccessor _accessor;
+
+ @BeforeClass
+ public void beforeClass()
+ throws InterruptedException {
+ System.out.println(
+ "START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
+
+ // setup storage cluster
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+ for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+ String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i);
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+ _instances.add(instance);
+ }
+
+ // start dummy participants
+ for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+ MockParticipantManager participant =
+ new TestParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
+ participant.setTransition(new DelayedTransitionBase(100));
+ participant.syncStart();
+ _participants.add(participant);
+ }
+
+ enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+ enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+ for (int i = 0; i < DB_COUNT; i++) {
+ createResourceWithDelayedRebalance(CLUSTER_NAME, "TestDB_" + i,
+ BuiltInStateModelDefinitions.MasterSlave.name(), PARTITION_NUMBER, REPLICA_NUMBER,
+ REPLICA_NUMBER - 1, 1000000L, CrushEdRebalanceStrategy.class.getName());
+ }
+
+ // start controller
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, _controllerName);
+ _controller.syncStart();
+
+ _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ _configAccessor = new ConfigAccessor(_gZkClient);
+ _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ _controller.syncStop();
+ for (MockParticipantManager p : _participants) {
+ if (p.isConnected()) {
+ p.syncStop();
+ }
+ }
+ deleteCluster(CLUSTER_NAME);
+ System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testP2PStateTransitionDisabled() throws InterruptedException {
+ enableP2PInCluster(CLUSTER_NAME, _configAccessor, false);
+
+ MockHelixTaskExecutor.resetStats();
+ // rolling upgrade the cluster
+ for (String ins : _instances) {
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, false);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ verifyP2PDisabled();
+
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, true);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ verifyP2PDisabled();
+ }
+
+ Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
+ "There are duplicated transition messages sent while participant is handling the state-transition!");
+ Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0,
+ "There are duplicated transition messages sent at same time!");
+ }
+
+ @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
+ public void testP2PStateTransitionEnabled() throws InterruptedException {
+ enableP2PInCluster(CLUSTER_NAME, _configAccessor, true);
+ long startTime = System.currentTimeMillis();
+ MockHelixTaskExecutor.resetStats();
+ // rolling upgrade the cluster
+ for (String ins : _instances) {
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, false);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ verifyP2PEnabled(startTime);
+
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, true);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ verifyP2PEnabled(startTime);
+ }
+
+ double ratio = ((double) p2pTrigged) / ((double) total);
+ Assert.assertTrue(ratio > 0.7, String
+ .format("Only %d out of %d percent transitions to Master were triggered by expected host!",
+ p2pTrigged, total));
+
+ Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
+ "There are duplicated transition messages sent while participant is handling the state-transition!");
+ Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0,
+ "There are duplicated transition messages sent at same time!");
+ }
+
+ private void verifyP2PDisabled() {
+ ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME);
+ dataCache.refresh(_accessor);
+ Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+
+ for (LiveInstance instance : liveInstanceMap.values()) {
+ Map<String, CurrentState> currentStateMap =
+ dataCache.getCurrentState(instance.getInstanceName(), instance.getSessionId());
+ Assert.assertNotNull(currentStateMap);
+ for (CurrentState currentState : currentStateMap.values()) {
+ for (String partition : currentState.getPartitionStateMap().keySet()) {
+ String state = currentState.getState(partition);
+ if (state.equalsIgnoreCase("MASTER")) {
+ String triggerHost = currentState.getTriggerHost(partition);
+ Assert.assertEquals(triggerHost, _controllerName,
+ state + " of " + partition + " on " + instance.getInstanceName()
+ + " was triggered by " + triggerHost);
+ }
+ }
+ }
+ }
+ }
+
+ static int total = 0;
+ static int p2pTrigged = 0;
+
+ private void verifyP2PEnabled(long startTime) {
+ ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME);
+ dataCache.refresh(_accessor);
+ Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+
+ for (LiveInstance instance : liveInstanceMap.values()) {
+ Map<String, CurrentState> currentStateMap =
+ dataCache.getCurrentState(instance.getInstanceName(), instance.getSessionId());
+ Assert.assertNotNull(currentStateMap);
+ for (CurrentState currentState : currentStateMap.values()) {
+ for (String partition : currentState.getPartitionStateMap().keySet()) {
+ String state = currentState.getState(partition);
+ long start = currentState.getStartTime(partition);
+ if (state.equalsIgnoreCase("MASTER") && start > startTime) {
+ String triggerHost = currentState.getTriggerHost(partition);
+ if (!triggerHost.equals(_controllerName)) {
+ p2pTrigged ++;
+ }
+ total ++;
+ }
+ }
+ }
+ }
+ }
+
+
+ static class TestParticipantManager extends MockParticipantManager {
+ private final DefaultMessagingService _messagingService;
+
+ public TestParticipantManager(String zkAddr, String clusterName, String instanceName) {
+ super(zkAddr, clusterName, instanceName);
+ _messagingService = new MockMessagingService(this);
+ }
+
+ @Override
+ public ClusterMessagingService getMessagingService() {
+ // The caller can register message handler factories on messaging service before the
+ // helix manager is connected. Thus we do not do connected check here.
+ return _messagingService;
+ }
+ }
+
+ static class MockMessagingService extends DefaultMessagingService {
+ private final HelixTaskExecutor _taskExecutor;
+ ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded =
+ new ConcurrentHashMap<>();
+ private final HelixManager _manager;
+
+ public MockMessagingService(HelixManager manager) {
+ super(manager);
+ _manager = manager;
+
+ boolean isParticipant = false;
+ if (manager.getInstanceType() == InstanceType.PARTICIPANT
+ || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
+ isParticipant = true;
+ }
+
+ _taskExecutor = new MockHelixTaskExecutor(
+ new ParticipantStatusMonitor(isParticipant, manager.getInstanceName()),
+ new MessageQueueMonitor(manager.getClusterName(), manager.getInstanceName()));
+ }
+
+ @Override
+ public synchronized void registerMessageHandlerFactory(String type,
+ MessageHandlerFactory factory) {
+ registerMessageHandlerFactory(Collections.singletonList(type), factory);
+ }
+
+ @Override
+ public synchronized void registerMessageHandlerFactory(List<String> types,
+ MessageHandlerFactory factory) {
+ if (_manager.isConnected()) {
+ for (String type : types) {
+ registerMessageHandlerFactoryExtended(type, factory);
+ }
+ } else {
+ for (String type : types) {
+ _messageHandlerFactoriestobeAdded.put(type, factory);
+ }
+ }
+ }
+
+ public synchronized void onConnected() {
+ for (String type : _messageHandlerFactoriestobeAdded.keySet()) {
+ registerMessageHandlerFactoryExtended(type, _messageHandlerFactoriestobeAdded.get(type));
+ }
+ _messageHandlerFactoriestobeAdded.clear();
+ }
+
+ public HelixTaskExecutor getExecutor() {
+ return _taskExecutor;
+ }
+
+
+ void registerMessageHandlerFactoryExtended(String type, MessageHandlerFactory factory) {
+ int threadpoolSize = HelixTaskExecutor.DEFAULT_PARALLEL_TASKS;
+ _taskExecutor.registerMessageHandlerFactory(type, factory, threadpoolSize);
+ super.sendNopMessage();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
index 4083988..83893c6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
@@ -98,8 +98,6 @@ public class TestExpandCluster extends TestPartitionMigrationBase {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
Assert.assertFalse(_migrationVerifier.hasLessReplica());
- Assert.assertFalse(_migrationVerifier.hasMoreReplica());
-
_migrationVerifier.stop();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
new file mode 100644
index 0000000..207f74c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
@@ -0,0 +1,111 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
+
+public class MockHelixTaskExecutor extends HelixTaskExecutor {
+ public static int duplicatedMessages = 0;
+ public static int extraStateTransition = 0;
+ public static int duplicatedMessagesInProgress = 0;
+ HelixManager manager;
+
+ public MockHelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor,
+ MessageQueueMonitor messageQueueMonitor) {
+ super(participantStatusMonitor, messageQueueMonitor);
+ }
+
+ @Override
+ public void onMessage(String instanceName, List<Message> messages,
+ NotificationContext changeContext) {
+ manager = changeContext.getManager();
+ checkDuplicatedMessages(messages);
+ super.onMessage(instanceName, messages, changeContext);
+ }
+
+ void checkDuplicatedMessages(List<Message> messages) {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey path = keyBuilder.currentStates(manager.getInstanceName(), manager.getSessionId());
+ Map<String, CurrentState> currentStateMap = accessor.getChildValuesMap(path);
+
+ Set<String> seenPartitions = new HashSet<>();
+ for (Message message : messages) {
+ if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
+ String resource = message.getResourceName();
+ String partition = message.getPartitionName();
+
+ //System.err.println(message.getMsgId());
+ String key = resource + "-" + partition;
+ if (seenPartitions.contains(key)) {
+ //System.err.println("Duplicated message received for " + resource + ":" + partition);
+ duplicatedMessages++;
+ }
+ seenPartitions.add(key);
+
+ String toState = message.getToState();
+ String state = null;
+ if (currentStateMap.containsKey(resource)) {
+ CurrentState currentState = currentStateMap.get(resource);
+ state = currentState.getState(partition);
+ }
+
+ if (toState.equals(state) && message.getMsgState() == Message.MessageState.NEW) {
+ // logger.error(
+ // "Extra message: " + message.getMsgId() + ", Partition is already in target state "
+ // + toState + " for " + resource + ":" + partition);
+ extraStateTransition++;
+ }
+
+ String messageTarget =
+ getMessageTarget(message.getResourceName(), message.getPartitionName());
+
+ if (message.getMsgState() == Message.MessageState.NEW &&
+ _messageTaskMap.containsKey(messageTarget)) {
+ String taskId = _messageTaskMap.get(messageTarget);
+ MessageTaskInfo messageTaskInfo = _taskMap.get(taskId);
+ Message existingMsg = messageTaskInfo.getTask().getMessage();
+ if (existingMsg.getMsgId() != message.getMsgId())
+ // logger.error("Duplicated message In Progress: " + message.getMsgId()
+ // + ", state transition in progress with message " + existingMsg.getMsgId()
+ // + " to " + toState + " for " + resource + ":" + partition);
+ duplicatedMessagesInProgress ++;
+ }
+ }
+ }
+ }
+
+ public static void resetStats() {
+ duplicatedMessages = 0;
+ extraStateTransition = 0;
+ duplicatedMessagesInProgress = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index d5e40be..bc1e554 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -32,6 +32,11 @@ import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.IntermediateStateOutput;
import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageSelectionStageOutput;
@@ -45,6 +50,7 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
@@ -156,7 +162,8 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
// Validate: Controller should not send S->M message to new master.
currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
- currentStateOutput.setPendingState(_db, _partition, initialMaster, toSlaveMessage);
+ currentStateOutput.setPendingMessage(_db, _partition, initialMaster, toSlaveMessage);
+ currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
@@ -175,7 +182,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
currentStateOutput =
populateCurrentStateFromBestPossible(_bestpossibleState);
currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
- currentStateOutput.setPendingState(_db, _partition, secondMaster, relayMessage);
+ currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
_fullPipeline.handle(event);
@@ -221,7 +228,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
// The initial master has forwarded the p2p message to secondMaster and deleted original M->S message on initialMaster,
// But the S->M state-transition has not completed yet in secondMaster.
// Validate: Controller should not send S->M to thirdMaster.
- currentStateOutput.setPendingState(_db, _partition, secondMaster, relayMessage);
+ currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
[3/3] helix git commit: Temorary workaround to fix P2P race-condition
for old helix participant (0.8.1 or older).
Posted by lx...@apache.org.
Temorary workaround to fix P2P race-condition for old helix participant (0.8.1 or older).
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/74145e8a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/74145e8a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/74145e8a
Branch: refs/heads/master
Commit: 74145e8ad3b34753186d53526bab825de4432c31
Parents: 880f885
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Jul 27 17:28:17 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 17 15:17:26 2018 -0700
----------------------------------------------------------------------
.../common/caches/InstanceMessagesCache.java | 98 ++++++++++++++++++--
.../stages/CurrentStateComputationStage.java | 9 +-
.../controller/stages/TaskAssignmentStage.java | 18 +++-
.../messaging/handling/HelixTaskExecutor.java | 12 ++-
.../messaging/TestP2PNoDuplicatedMessage.java | 13 ++-
5 files changed, 120 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index 13b77cc..9fa9136 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -34,6 +34,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
+import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,10 +54,22 @@ public class InstanceMessagesCache {
// <instance -> {<MessageId, Message>}>
private Map<String, Map<String, Message>> _relayMessageCache = Maps.newHashMap();
+
+ // TODO: Temporary workaround to void participant receiving duplicated state transition messages when p2p is enable.
+ // should remove this once all clients are migrated to 0.8.2. -- Lei
+ private Map<String, Message> _committedRelayMessages = Maps.newHashMap();
+
+ public static final String COMMIT_MESSAGE_EXPIRY_CONFIG = "helix.controller.messagecache.commitmessageexpiry";
+
+ private static final int DEFAULT_COMMIT_RELAY_MESSAGE_EXPIRY = 20 * 1000; // 20 seconds
+ private final int _commitMessageExpiry;
+
private String _clusterName;
public InstanceMessagesCache(String clusterName) {
_clusterName = clusterName;
+ _commitMessageExpiry = HelixUtil
+ .getSystemPropertyAsInt(COMMIT_MESSAGE_EXPIRY_CONFIG, DEFAULT_COMMIT_RELAY_MESSAGE_EXPIRY);
}
/**
@@ -203,14 +216,27 @@ public class InstanceMessagesCache {
String targetState = message.getToState();
String instanceSessionId = liveInstanceMap.get(instance).getSessionId();
- if (_messageMap.get(instance).containsKey(message.getMsgId())) {
- // relay message has already been sent to target host
- // remove the message from relayMessageCache.
- LOG.info(
- "Relay message has already been sent to target host, remove relay message from the cache"
- + message.getId());
- iterator.remove();
- continue;
+ Map<String, Message> instanceMsgMap = _messageMap.get(instance);
+
+ if (instanceMsgMap != null && instanceMsgMap.containsKey(message.getMsgId())) {
+ Message commitMessage = instanceMsgMap.get(message.getMsgId());
+
+ if (!commitMessage.isRelayMessage()) {
+ LOG.info(
+ "Controller already sent the message to the target host, remove relay message from the cache"
+ + message.getId());
+ iterator.remove();
+ _committedRelayMessages.remove(message.getMsgId());
+ continue;
+ } else {
+ // relay message has already been sent to target host
+ // remember when the relay messages get relayed to the target host.
+ if (!_committedRelayMessages.containsKey(message.getMsgId())) {
+ message.setRelayTime(System.currentTimeMillis());
+ _committedRelayMessages.put(message.getMsgId(), message);
+ LOG.info("Put message into committed relay messages " + message.getId());
+ }
+ }
}
if (!instanceSessionId.equals(sessionId)) {
@@ -227,6 +253,7 @@ public class InstanceMessagesCache {
.getId());
continue;
}
+
CurrentState currentState = sessionCurrentStateMap.get(resourceName);
if (currentState != null && targetState.equals(currentState.getState(partitionName))) {
LOG.info("CurrentState " + currentState
@@ -237,8 +264,8 @@ public class InstanceMessagesCache {
}
if (message.isExpired()) {
- LOG.info("relay message " + message.getId() + " expired, remove it from cache."
- + message.getId());
+ LOG.error("relay message has not been sent " + message.getId()
+ + " expired, remove it from cache. relay time: " + message.getRelayTime());
iterator.remove();
continue;
}
@@ -251,6 +278,55 @@ public class InstanceMessagesCache {
}
_relayMessageMap = Collections.unmodifiableMap(relayMessageMap);
+
+ // TODO: this is a workaround, remove this once the participants are all in 0.8.2,
+ checkCommittedRelayMessages(currentStateMap);
+
+ }
+
+ // TODO: this is a workaround, once the participants are all in 0.8.2,
+ private void checkCommittedRelayMessages(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
+ Iterator<Map.Entry<String, Message>> it = _committedRelayMessages.entrySet().iterator();
+ while (it.hasNext()) {
+ Message message = it.next().getValue();
+
+ String resourceName = message.getResourceName();
+ String partitionName = message.getPartitionName();
+ String targetState = message.getToState();
+ String instance = message.getTgtName();
+ String sessionId = message.getTgtSessionId();
+
+ long committedTime = message.getRelayTime();
+ if (committedTime + _commitMessageExpiry < System.currentTimeMillis()) {
+ LOG.info("relay message " + message.getMsgId()
+ + " is expired after committed, remove it from committed message cache.");
+ it.remove();
+ continue;
+ }
+
+ Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
+ currentStateMap.get(instance);
+ if (instanceCurrentStateMap == null || instanceCurrentStateMap.get(sessionId) == null) {
+ LOG.info(
+ "No sessionCurrentStateMap found, remove it from committed message cache." + message
+ .getId());
+ it.remove();
+ continue;
+ }
+
+ Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId);
+ CurrentState currentState = sessionCurrentStateMap.get(resourceName);
+ if (currentState != null && targetState.equals(currentState.getState(partitionName))) {
+ LOG.info("CurrentState " + currentState
+ + " match the target state of the relay message, remove it from committed message cache."
+ + message.getId());
+ it.remove();
+ continue;
+ }
+
+ Map<String, Message> cachedMap = _messageMap.get(message.getTgtName());
+ cachedMap.put(message.getId(), message);
+ }
}
/**
@@ -303,6 +379,8 @@ public class InstanceMessagesCache {
_relayMessageCache.put(instanceName, Maps.<String, Message>newHashMap());
}
_relayMessageCache.get(instanceName).put(message.getId(), message);
+
+ LOG.info("Add message to relay cache " + message.getMsgId());
}
@Override public String toString() {
http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 9cc9506..340a051 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -164,8 +164,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
message.getMsgId(), resourceName, message.getPartitionName()));
}
} else {
- LogUtil.logWarn(LOG, _eventId, String
- .format("A relay message %s should not be batched, ignored!", message.getMsgId()));
+ LogUtil.logWarn(LOG, _eventId,
+ String.format("A relay message %s should not be batched, ignored!", message.getMsgId()));
}
}
}
@@ -275,8 +275,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
// Check whether it is already passed threshold
for (String resourceName : missingTopStateMap.keySet()) {
for (String partitionName : missingTopStateMap.get(resourceName).keySet()) {
- long startTime = missingTopStateMap.get(resourceName).get(partitionName);
- if (startTime > 0 && System.currentTimeMillis() - startTime > durationThreshold) {
+ Long startTime = missingTopStateMap.get(resourceName).get(partitionName);
+ if (startTime != null && startTime > 0
+ && System.currentTimeMillis() - startTime > durationThreshold) {
missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED);
if (clusterStatusMonitor != null) {
clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false);
http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 6036f34..6d483a0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -74,7 +74,8 @@ public class TaskAssignmentStage extends AbstractBaseStage {
List<Message> outputMessages =
batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveInstanceMap,
manager.getProperties());
- sendMessages(dataAccessor, outputMessages);
+
+ List<Message> messagesSent = sendMessages(dataAccessor, outputMessages);
// TODO: Need also count messages from task rebalancer
if (!cache.isTaskCache()) {
ClusterStatusMonitor clusterStatusMonitor =
@@ -84,7 +85,7 @@ public class TaskAssignmentStage extends AbstractBaseStage {
}
}
long cacheStart = System.currentTimeMillis();
- cache.cacheMessages(outputMessages);
+ cache.cacheMessages(messagesSent);
long cacheEnd = System.currentTimeMillis();
LogUtil.logDebug(logger, _eventId, "Caching messages took " + (cacheEnd - cacheStart) + " ms");
}
@@ -132,9 +133,11 @@ public class TaskAssignmentStage extends AbstractBaseStage {
return outputMessages;
}
- protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) {
+ // return the messages actually sent
+ protected List<Message> sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) {
+ List<Message> messageSent = new ArrayList<>();
if (messages == null || messages.isEmpty()) {
- return;
+ return messageSent;
}
Builder keyBuilder = dataAccessor.keyBuilder();
@@ -146,6 +149,7 @@ public class TaskAssignmentStage extends AbstractBaseStage {
+ message.getResourceName() + "." + message.getPartitionName() + "|" + message
.getPartitionNames() + " from:" + message.getFromState() + " to:" + message
.getToState() + ", relayMessages: " + message.getRelayMessages().size());
+
if (message.hasRelayMessages()) {
for (Message msg : message.getRelayMessages().values()) {
LogUtil.logInfo(logger, _eventId,
@@ -163,8 +167,12 @@ public class TaskAssignmentStage extends AbstractBaseStage {
boolean[] results = dataAccessor.createChildren(keys, new ArrayList<>(messages));
for (int i = 0; i < results.length; i++) {
if (!results[i]) {
- LogUtil.logWarn(logger, _eventId, "Failed to send message: " + keys.get(i));
+ LogUtil.logError(logger, _eventId, "Failed to send message: " + keys.get(i));
+ } else {
+ messageSent.add(messages.get(i));
}
}
+
+ return messageSent;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 925f127..5e2082c 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -874,12 +874,16 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
} else if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
&& isStateTransitionInProgress(messageTarget)) {
+ String taskId = _messageTaskMap.get(messageTarget);
+ Message msg = _taskMap.get(taskId).getTask().getMessage();
+
// If there is another state transition for same partition is going on,
// discard the message. Controller will resend if this is a valid message
- throw new HelixException(String
- .format("Another state transition for %s:%s is in progress. Discarding %s->%s message",
- message.getResourceName(), message.getPartitionName(), message.getFromState(),
- message.getToState()));
+ throw new HelixException(String.format(
+ "Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
+ message.getResourceName(), message.getPartitionName(), msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
+ msg.getReadTimeStamp(), System.currentTimeMillis(), message.getFromState(),
+ message.getToState()));
}
stateTransitionHandlers
http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
index b3ef3e5..bf7f566 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -80,8 +80,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
HelixDataAccessor _accessor;
@BeforeClass
- public void beforeClass()
- throws InterruptedException {
+ public void beforeClass() {
System.out.println(
"START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
@@ -136,7 +135,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
}
@Test
- public void testP2PStateTransitionDisabled() throws InterruptedException {
+ public void testP2PStateTransitionDisabled() {
enableP2PInCluster(CLUSTER_NAME, _configAccessor, false);
MockHelixTaskExecutor.resetStats();
@@ -158,7 +157,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
}
@Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
- public void testP2PStateTransitionEnabled() throws InterruptedException {
+ public void testP2PStateTransitionEnabled() {
enableP2PInCluster(CLUSTER_NAME, _configAccessor, true);
long startTime = System.currentTimeMillis();
MockHelixTaskExecutor.resetStats();
@@ -174,9 +173,9 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
}
double ratio = ((double) p2pTrigged) / ((double) total);
- Assert.assertTrue(ratio > 0.7, String
- .format("Only %d out of %d percent transitions to Master were triggered by expected host!",
- p2pTrigged, total));
+ Assert.assertTrue(ratio > 0.6, String
+ .format("Only %d out of %d percent transitions to Master were triggered by expected host!",
+ p2pTrigged, total));
Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
"There are duplicated transition messages sent while participant is handling the state-transition!");