You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2018/09/17 22:26:54 UTC

[1/3] helix git commit: Fix P2P message logic in controller to avoid sending duplicated messages to participants.

Repository: helix
Updated Branches:
  refs/heads/master f1c503712 -> 74145e8ad


http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index a88965b..c8048fa 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -21,6 +21,7 @@ package org.apache.helix.messaging.p2pMessage;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.pipeline.Pipeline;
@@ -54,8 +55,8 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
 
 
   private void preSetup() {
-    setupIdealState(3, new String[]{db}, numPartition, numReplica, IdealState.RebalanceMode.SEMI_AUTO,
-        BuiltInStateModelDefinitions.MasterSlave.name());
+    setupIdealState(3, new String[] { db }, numPartition, numReplica,
+        IdealState.RebalanceMode.SEMI_AUTO, BuiltInStateModelDefinitions.MasterSlave.name());
     setupStateModel();
     setupInstances(3);
     setupLiveInstances(3);
@@ -77,10 +78,194 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
     testP2PMessage(null, false);
   }
 
-  private void testP2PMessage(ClusterConfig clusterConfig, Boolean p2pMessageEnabled) throws Exception {
-    Map<String, Resource> resourceMap =
-        getResourceMap(new String[]{db}, numPartition, BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig,
-            null);
+  @Test
+  public void testAvoidDuplicatedMessageWithP2PEnabled() throws Exception {
+    preSetup();
+    ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
+    clusterConfig.enableP2PMessage(true);
+    setClusterConfig(clusterConfig);
+
+    Map<String, Resource> resourceMap = getResourceMap(new String[] { db }, numPartition,
+        BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig, null);
+
+    ClusterDataCache cache = new ClusterDataCache();
+    cache.setAsyncTasksThreadPool(Executors.newSingleThreadExecutor());
+    event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput());
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+
+    Pipeline pipeline = createPipeline();
+    pipeline.handle(event);
+
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+
+    CurrentStateOutput currentStateOutput =
+        populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    Partition p = new Partition(db + "_0");
+
+    String masterInstance = getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+        MasterSlaveSMD.States.MASTER.name());
+    Assert.assertNotNull(masterInstance);
+
+    admin.enableInstance(_clusterName, masterInstance, false);
+    cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+    cache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
+
+    pipeline = createPipeline();
+    pipeline.handle(event);
+
+    bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+
+    MessageSelectionStageOutput messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    List<Message> messages = messageOutput.getMessages(db, p);
+
+    Assert.assertEquals(messages.size(), 1);
+    Message toSlaveMessage = messages.get(0);
+    Assert.assertEquals(toSlaveMessage.getTgtName(), masterInstance);
+    Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
+    Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
+
+    // verify p2p message sent to the old master instance
+    Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1);
+    String newMasterInstance =
+        getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+            MasterSlaveSMD.States.MASTER.name());
+
+    Message relayMessage = toSlaveMessage.getRelayMessage(newMasterInstance);
+    Assert.assertNotNull(relayMessage);
+    Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
+    Assert.assertEquals(relayMessage.getTgtName(), newMasterInstance);
+    Assert.assertEquals(relayMessage.getRelaySrcHost(), masterInstance);
+    Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
+
+
+    // test the old master finish state transition, but has not forward p2p message yet.
+    currentStateOutput.setCurrentState(db, p, masterInstance, "SLAVE");
+    currentStateOutput.setPendingMessage(db, p, masterInstance, toSlaveMessage);
+    currentStateOutput.setPendingRelayMessage(db, p, masterInstance, relayMessage);
+
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    pipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(db, p);
+    Assert.assertEquals(messages.size(), 0);
+
+
+    currentStateOutput =
+        populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+    currentStateOutput.setCurrentState(db, p, masterInstance, "SLAVE");
+    currentStateOutput.setPendingMessage(db, p, newMasterInstance, relayMessage);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    pipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(db, p);
+    Assert.assertEquals(messages.size(), 1);
+
+    Message toOfflineMessage = messages.get(0);
+    Assert.assertEquals(toOfflineMessage.getTgtName(), masterInstance);
+    Assert.assertEquals(toOfflineMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(toOfflineMessage.getToState(), MasterSlaveSMD.States.OFFLINE.name());
+
+
+    // Now, the old master finish state transition, but has not forward p2p message yet.
+    // Then the preference list has changed, so now the new master is different from previously calculated new master
+    // but controller should not send S->M to newly calculated master.
+    currentStateOutput.setCurrentState(db, p, masterInstance, "OFFLINE");
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    String slaveInstance =
+        getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+            MasterSlaveSMD.States.SLAVE.name());
+
+    Map<String, String> instanceStateMap = bestPossibleStateOutput.getInstanceStateMap(db, p);
+    instanceStateMap.put(newMasterInstance, "SLAVE");
+    instanceStateMap.put(slaveInstance, "MASTER");
+    bestPossibleStateOutput.setState(db, p, instanceStateMap);
+
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+
+    pipeline = new Pipeline("test");
+    pipeline.addStage(new IntermediateStateCalcStage());
+    pipeline.addStage(new MessageGenerationPhase());
+    pipeline.addStage(new MessageSelectionStage());
+    pipeline.addStage(new MessageThrottleStage());
+
+    pipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(db, p);
+    Assert.assertEquals(messages.size(), 0);
+
+
+    // Now, the old master has forwarded the p2p master to previously calculated master,
+    // So the state-transition still happened in previously calculated master.
+    // Controller will not send S->M to new master.
+    currentStateOutput.setPendingMessage(db, p, newMasterInstance, relayMessage);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), bestPossibleStateOutput);
+
+
+    pipeline = new Pipeline("test");
+    pipeline.addStage(new IntermediateStateCalcStage());
+    pipeline.addStage(new MessageGenerationPhase());
+    pipeline.addStage(new MessageSelectionStage());
+    pipeline.addStage(new MessageThrottleStage());
+
+    pipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(db, p);
+    Assert.assertEquals(messages.size(), 0);
+
+
+    // now, the previous calculated master completed the state transition and deleted the p2p message.
+    // Controller should drop this master first.
+    currentStateOutput =
+        populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+    currentStateOutput.setCurrentState(db, p, newMasterInstance, "MASTER");
+    currentStateOutput.setCurrentState(db, p, slaveInstance, "SLAVE");
+
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    pipeline = new Pipeline("test");
+    pipeline.addStage(new MessageGenerationPhase());
+    pipeline.addStage(new MessageSelectionStage());
+    pipeline.addStage(new MessageThrottleStage());
+
+    pipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(db, p);
+    Assert.assertEquals(messages.size(), 1);
+
+    toSlaveMessage = messages.get(0);
+    Assert.assertEquals(toSlaveMessage.getTgtName(), newMasterInstance);
+    Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
+    Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
+  }
+
+  private void testP2PMessage(ClusterConfig clusterConfig, Boolean p2pMessageEnabled)
+      throws Exception {
+    Map<String, Resource> resourceMap = getResourceMap(new String[] { db }, numPartition,
+        BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig, null);
 
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
@@ -90,26 +275,30 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
     Pipeline pipeline = createPipeline();
     pipeline.handle(event);
 
-    BestPossibleStateOutput bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
 
-    CurrentStateOutput currentStateOutput = populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+    CurrentStateOutput currentStateOutput =
+        populateCurrentStateFromBestPossible(bestPossibleStateOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     Partition p = new Partition(db + "_0");
 
-    String masterInstance =
-        getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p), MasterSlaveSMD.States.MASTER.name());
+    String masterInstance = getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+        MasterSlaveSMD.States.MASTER.name());
     Assert.assertNotNull(masterInstance);
 
     admin.enableInstance(_clusterName, masterInstance, false);
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
     cache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
 
+    pipeline = createPipeline();
     pipeline.handle(event);
 
     bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
 
-    MessageSelectionStageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    MessageSelectionStageOutput messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     List<Message> messages = messageOutput.getMessages(db, p);
 
     Assert.assertEquals(messages.size(), 1);
@@ -121,7 +310,8 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
     if (p2pMessageEnabled) {
       Assert.assertEquals(message.getRelayMessages().entrySet().size(), 1);
       String newMasterInstance =
-          getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p), MasterSlaveSMD.States.MASTER.name());
+          getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+              MasterSlaveSMD.States.MASTER.name());
 
       Message relayMessage = message.getRelayMessage(newMasterInstance);
       Assert.assertNotNull(relayMessage);
@@ -146,7 +336,8 @@ public class TestP2PStateTransitionMessages extends BaseStageTest {
     return masterInstance;
   }
 
-  private CurrentStateOutput populateCurrentStateFromBestPossible(BestPossibleStateOutput bestPossibleStateOutput) {
+  private CurrentStateOutput populateCurrentStateFromBestPossible(
+      BestPossibleStateOutput bestPossibleStateOutput) {
     CurrentStateOutput currentStateOutput = new CurrentStateOutput();
     for (String resource : bestPossibleStateOutput.getResourceStatesMap().keySet()) {
       PartitionStateMap partitionStateMap = bestPossibleStateOutput.getPartitionStateMap(resource);


[2/3] helix git commit: Fix P2P message logic in controller to avoid sending duplicated messages to participants.

Posted by lx...@apache.org.
Fix P2P message logic in controller to avoid sending duplicated messages to participants.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/880f8851
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/880f8851
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/880f8851

Branch: refs/heads/master
Commit: 880f885121afecab4e186282fbf94a146a2cf04a
Parents: f1c5037
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Apr 24 18:18:40 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 17 15:08:28 2018 -0700

----------------------------------------------------------------------
 .../common/caches/InstanceMessagesCache.java    | 146 +++++++--
 .../controller/stages/ClusterDataCache.java     |  12 +-
 .../stages/CurrentStateComputationStage.java    |  57 +++-
 .../controller/stages/CurrentStateOutput.java   |  58 ++--
 .../stages/MessageGenerationPhase.java          |  17 +-
 .../stages/MessageSelectionStage.java           |  31 +-
 .../controller/stages/TaskAssignmentStage.java  |   7 +-
 .../messaging/handling/HelixTaskExecutor.java   |  22 +-
 .../java/org/apache/helix/model/Message.java    |  12 +-
 .../helix/task/AbstractTaskDispatcher.java      |   2 +-
 .../helix/task/DeprecatedTaskRebalancer.java    |   2 +-
 .../FixedTargetTaskAssignmentCalculator.java    |   4 +-
 .../org/apache/helix/task/JobRebalancer.java    |   5 +-
 .../ZkHelixClusterVerifier.java                 |   2 +-
 .../org/apache/helix/common/ZkTestBase.java     |  34 ++
 .../TestCurrentStateComputationStage.java       |   2 +-
 .../stages/TestMsgSelectionStage.java           |   5 +-
 .../messaging/TestP2PMessageSemiAuto.java       |  68 ++--
 .../messaging/TestP2PNoDuplicatedMessage.java   | 315 +++++++++++++++++++
 .../PartitionMigration/TestExpandCluster.java   |   2 -
 .../handling/MockHelixTaskExecutor.java         | 111 +++++++
 .../TestP2PMessagesAvoidDuplicatedMessage.java  |  13 +-
 .../TestP2PStateTransitionMessages.java         | 217 ++++++++++++-
 23 files changed, 990 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index f8001da..13b77cc 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -22,7 +22,7 @@ package org.apache.helix.common.caches;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -43,9 +43,16 @@ import org.slf4j.LoggerFactory;
 public class InstanceMessagesCache {
   private static final Logger LOG = LoggerFactory.getLogger(InstanceMessagesCache.class.getName());
   private Map<String, Map<String, Message>> _messageMap;
+  private Map<String, Map<String, Message>> _relayMessageMap;
 
   // maintain a cache of participant messages across pipeline runs
+  // <instance -> {<MessageId, Message>}>
   private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
+
+  // maintain a set of valid pending P2P messages.
+  // <instance -> {<MessageId, Message>}>
+  private Map<String, Map<String, Message>> _relayMessageCache = Maps.newHashMap();
+
   private String _clusterName;
 
   public InstanceMessagesCache(String clusterName) {
@@ -54,17 +61,15 @@ public class InstanceMessagesCache {
 
   /**
    * This refreshes all pending messages in the cluster by re-fetching the data from zookeeper in an
-   * efficient way
-   * current state must be refreshed before refreshing relay messages because we need to use current
-   * state to validate all relay messages.
+   * efficient way current state must be refreshed before refreshing relay messages because we need
+   * to use current state to validate all relay messages.
    *
    * @param accessor
    * @param liveInstanceMap
    *
    * @return
    */
-  public boolean refresh(HelixDataAccessor accessor,
-      Map<String, LiveInstance> liveInstanceMap) {
+  public boolean refresh(HelixDataAccessor accessor, Map<String, LiveInstance> liveInstanceMap) {
     LOG.info("START: InstanceMessagesCache.refresh()");
     long startTime = System.currentTimeMillis();
 
@@ -124,13 +129,16 @@ public class InstanceMessagesCache {
           System.currentTimeMillis() - startTime) + " ms.");
     }
 
+    LOG.info("END: InstanceMessagesCache.refresh()");
+
     return true;
   }
 
   // update all valid relay messages attached to existing state transition messages into message map.
   public void updateRelayMessages(Map<String, LiveInstance> liveInstanceMap,
       Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
-    List<Message> relayMessages = new ArrayList<>();
+
+    // refresh _relayMessageCache
     for (String instance : _messageMap.keySet()) {
       Map<String, Message> instanceMessages = _messageMap.get(instance);
       Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
@@ -170,51 +178,131 @@ public class InstanceMessagesCache {
 
           for (Message relayMsg : message.getRelayMessages().values()) {
             relayMsg.setRelayTime(transitionCompleteTime);
-            relayMessages.add(relayMsg);
+            cacheRelayMessage(relayMsg);
           }
         }
       }
     }
 
-    for (Message message : relayMessages) {
-      String instance = message.getTgtName();
-      Map<String, Message> instanceMessages = _messageMap.get(instance);
-      if (instanceMessages == null) {
-        instanceMessages = new HashMap<>();
-        _messageMap.put(instance, instanceMessages);
+    Map<String, Map<String, Message>> relayMessageMap = new HashMap<>();
+    // refresh _relayMessageMap
+    for (String instance : _relayMessageCache.keySet()) {
+      Map<String, Message> messages = _relayMessageCache.get(instance);
+      Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
+          currentStateMap.get(instance);
+      if (instanceCurrentStateMap == null) {
+        continue;
+      }
+
+      Iterator<Map.Entry<String, Message>> iterator = messages.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Message message = iterator.next().getValue();
+        String sessionId = message.getTgtSessionId();
+        String resourceName = message.getResourceName();
+        String partitionName = message.getPartitionName();
+        String targetState = message.getToState();
+        String instanceSessionId = liveInstanceMap.get(instance).getSessionId();
+
+        if (_messageMap.get(instance).containsKey(message.getMsgId())) {
+          // relay message has already been sent to target host
+          // remove the message from relayMessageCache.
+          LOG.info(
+              "Relay message has already been sent to target host, remove relay message from the cache"
+                  + message.getId());
+          iterator.remove();
+          continue;
+        }
+
+        if (!instanceSessionId.equals(sessionId)) {
+          LOG.info(
+              "Instance SessionId does not match, remove relay message from the cache" + message
+                  .getId());
+          iterator.remove();
+          continue;
+        }
+
+        Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId);
+        if (sessionCurrentStateMap == null) {
+          LOG.info("No sessionCurrentStateMap found, ignore relay message from the cache" + message
+              .getId());
+          continue;
+        }
+        CurrentState currentState = sessionCurrentStateMap.get(resourceName);
+        if (currentState != null && targetState.equals(currentState.getState(partitionName))) {
+          LOG.info("CurrentState " + currentState
+              + " match the target state of the relay message, remove relay from cache." + message
+              .getId());
+          iterator.remove();
+          continue;
+        }
+
+        if (message.isExpired()) {
+          LOG.info("relay message " + message.getId() + " expired, remove it from cache."
+              + message.getId());
+          iterator.remove();
+          continue;
+        }
+
+        if (!relayMessageMap.containsKey(instance)) {
+          relayMessageMap.put(instance, Maps.<String, Message>newHashMap());
+        }
+        relayMessageMap.get(instance).put(message.getMsgId(), message);
       }
-      instanceMessages.put(message.getId(), message);
     }
+
+    _relayMessageMap = Collections.unmodifiableMap(relayMessageMap);
   }
 
   /**
-   * Provides a list of current outstanding transitions on a given instance.
+   * Provides a list of current outstanding pending state transition messages on a given instance.
    *
    * @param instanceName
    *
    * @return
    */
   public Map<String, Message> getMessages(String instanceName) {
-    Map<String, Message> map = _messageMap.get(instanceName);
-    if (map != null) {
-      return map;
-    } else {
-      return Collections.emptyMap();
+    if (_messageMap.containsKey(instanceName)) {
+      return _messageMap.get(instanceName);
     }
+    return Collections.emptyMap();
   }
 
-  public void cacheMessages(List<Message> messages) {
+  /**
+   * Provides a list of current outstanding pending relay (p2p) messages on a given instance.
+   *
+   * @param instanceName
+   *
+   * @return
+   */
+  public Map<String, Message> getRelayMessages(String instanceName) {
+    if (_relayMessageMap.containsKey(instanceName)) {
+      return _relayMessageMap.get(instanceName);
+    }
+    return Collections.emptyMap();
+  }
+
+  public void cacheMessages(Collection<Message> messages) {
     for (Message message : messages) {
       String instanceName = message.getTgtName();
-      Map<String, Message> instMsgMap;
-      if (_messageCache.containsKey(instanceName)) {
-        instMsgMap = _messageCache.get(instanceName);
-      } else {
-        instMsgMap = Maps.newHashMap();
-        _messageCache.put(instanceName, instMsgMap);
+      if (!_messageCache.containsKey(instanceName)) {
+        _messageCache.put(instanceName, Maps.<String, Message>newHashMap());
       }
-      instMsgMap.put(message.getId(), message);
+      _messageCache.get(instanceName).put(message.getId(), message);
+
+      if (message.hasRelayMessages()) {
+        for (Message relayMsg : message.getRelayMessages().values()) {
+          cacheRelayMessage(relayMsg);
+        }
+      }
+    }
+  }
+
+  protected void cacheRelayMessage(Message message) {
+    String instanceName = message.getTgtName();
+    if (!_relayMessageCache.containsKey(instanceName)) {
+      _relayMessageCache.put(instanceName, Maps.<String, Message>newHashMap());
     }
+    _relayMessageCache.get(instanceName).put(message.getId(), message);
   }
 
   @Override public String toString() {

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 3e6bd86..577b2c7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -476,7 +477,16 @@ public class ClusterDataCache {
     return _instanceMessagesCache.getMessages(instanceName);
   }
 
-  public void cacheMessages(List<Message> messages) {
+  /**
+   * Provides a list of current outstanding pending relay messages on a given instance.
+   * @param instanceName
+   * @return
+   */
+  public Map<String, Message> getRelayMessages(String instanceName) {
+    return _instanceMessagesCache.getRelayMessages(instanceName);
+  }
+
+  public void cacheMessages(Collection<Message> messages) {
     _instanceMessagesCache.cacheMessages(messages);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index a56d194..9cc9506 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -66,10 +66,12 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
 
       // update pending messages
       Map<String, Message> messages = cache.getMessages(instanceName);
-      updatePendingMessages(instance, messages.values(), currentStateOutput, resourceMap);
+      Map<String, Message> relayMessages = cache.getRelayMessages(instanceName);
+      updatePendingMessages(instance, messages.values(), currentStateOutput, relayMessages.values(), resourceMap);
 
       // update current states.
-      Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName, instanceSessionId);
+      Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName,
+          instanceSessionId);
       updateCurrentStates(instance, currentStateMap.values(), currentStateOutput, resourceMap);
     }
 
@@ -84,7 +86,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
 
   // update all pending messages to CurrentStateOutput.
   private void updatePendingMessages(LiveInstance instance, Collection<Message> pendingMessages,
-      CurrentStateOutput currentStateOutput, Map<String, Resource> resourceMap) {
+      CurrentStateOutput currentStateOutput, Collection<Message> pendingRelayMessages,
+      Map<String, Resource> resourceMap) {
     String instanceName = instance.getInstanceName();
     String instanceSessionId = instance.getSessionId();
 
@@ -100,6 +103,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       String resourceName = message.getResourceName();
       Resource resource = resourceMap.get(resourceName);
       if (resource == null) {
+        LogUtil.logInfo(LOG, _eventId, String.format(
+            "Ignore a pending relay message %s for a non-exist resource %s and partition %s",
+            message.getMsgId(), resourceName, message.getPartitionName()));
         continue;
       }
 
@@ -109,7 +115,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
         if (partition != null) {
           setMessageState(currentStateOutput, resourceName, partition, instanceName, message);
         } else {
-          // log
+          LogUtil.logInfo(LOG, _eventId, String
+              .format("Ignore a pending message %s for a non-exist resource %s and partition %s",
+                  message.getMsgId(), resourceName, message.getPartitionName()));
         }
       } else {
         List<String> partitionNames = message.getPartitionNames();
@@ -119,12 +127,47 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
             if (partition != null) {
               setMessageState(currentStateOutput, resourceName, partition, instanceName, message);
             } else {
-              // log
+              LogUtil.logInfo(LOG, _eventId, String.format(
+                  "Ignore a pending message %s for a non-exist resource %s and partition %s",
+                  message.getMsgId(), resourceName, message.getPartitionName()));
             }
           }
         }
       }
     }
+
+
+    // update all pending relay messages
+    for (Message message : pendingRelayMessages) {
+      if (!message.isRelayMessage()) {
+        LogUtil.logWarn(LOG, _eventId,
+            String.format("Not a relay message %s, ignored!", message.getMsgId()));
+        continue;
+      }
+      String resourceName = message.getResourceName();
+      Resource resource = resourceMap.get(resourceName);
+      if (resource == null) {
+        LogUtil.logInfo(LOG, _eventId, String.format(
+            "Ignore a pending relay message %s for a non-exist resource %s and partition %s",
+            message.getMsgId(), resourceName, message.getPartitionName()));
+        continue;
+      }
+
+      if (!message.getBatchMessageMode()) {
+        String partitionName = message.getPartitionName();
+        Partition partition = resource.getPartition(partitionName);
+        if (partition != null) {
+          currentStateOutput.setPendingRelayMessage(resourceName, partition, instanceName, message);
+        } else {
+          LogUtil.logInfo(LOG, _eventId, String.format(
+              "Ignore a pending relay message %s for a non-exist resource %s and partition %s",
+              message.getMsgId(), resourceName, message.getPartitionName()));
+        }
+      } else {
+        LogUtil.logWarn(LOG, _eventId, String
+            .format("A relay message %s should not be batched, ignored!", message.getMsgId()));
+      }
+    }
   }
 
   // update current states in CurrentStateOutput
@@ -169,9 +212,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
   private void setMessageState(CurrentStateOutput currentStateOutput, String resourceName,
       Partition partition, String instanceName, Message message) {
     if (MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())) {
-      currentStateOutput.setPendingState(resourceName, partition, instanceName, message);
+      currentStateOutput.setPendingMessage(resourceName, partition, instanceName, message);
     } else {
-      currentStateOutput.setCancellationState(resourceName, partition, instanceName, message);
+      currentStateOutput.setCancellationMessage(resourceName, partition, instanceName, message);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 4ebef97..b634703 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -37,8 +37,9 @@ import com.google.common.collect.Sets;
  */
 public class CurrentStateOutput {
   private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap;
-  private final Map<String, Map<Partition, Map<String, Message>>> _pendingStateMap;
-  private final Map<String, Map<Partition, Map<String, Message>>> _cancellationStateMap;
+  private final Map<String, Map<Partition, Map<String, Message>>> _pendingMessageMap;
+  private final Map<String, Map<Partition, Map<String, Message>>> _cancellationMessageMap;
+  private final Map<String, Map<Partition, Map<String, Message>>> _pendingRelayMessageMap;
 
   // resourceName -> (Partition -> (instanceName -> endTime))
   // Note that startTime / endTime in CurrentState marks that of state transition
@@ -61,8 +62,9 @@ public class CurrentStateOutput {
 
   public CurrentStateOutput() {
     _currentStateMap = new HashMap<>();
-    _pendingStateMap = new HashMap<>();
-    _cancellationStateMap = new HashMap<>();
+    _pendingMessageMap = new HashMap<>();
+    _pendingRelayMessageMap = new HashMap<>();
+    _cancellationMessageMap = new HashMap<>();
     _currentStateEndTimeMap = new HashMap<>();
     _resourceStateModelMap = new HashMap<>();
     _curStateMetaMap = new HashMap<>();
@@ -140,9 +142,9 @@ public class CurrentStateOutput {
     _infoMap.get(resourceName).get(partition).put(instanceName, state);
   }
 
-  public void setPendingState(String resourceName, Partition partition, String instanceName,
+  public void setPendingMessage(String resourceName, Partition partition, String instanceName,
       Message message) {
-    setStateMessage(resourceName, partition, instanceName, message, _pendingStateMap);
+    setStateMessage(resourceName, partition, instanceName, message, _pendingMessageMap);
   }
 
   /**
@@ -152,9 +154,14 @@ public class CurrentStateOutput {
    * @param instanceName
    * @param message
    */
-  public void setCancellationState(String resourceName, Partition partition, String instanceName,
+  public void setCancellationMessage(String resourceName, Partition partition, String instanceName,
       Message message) {
-    setStateMessage(resourceName, partition, instanceName, message, _cancellationStateMap);
+    setStateMessage(resourceName, partition, instanceName, message, _cancellationMessageMap);
+  }
+
+  public void setPendingRelayMessage(String resourceName, Partition partition, String instanceName,
+      Message message) {
+    setStateMessage(resourceName, partition, instanceName, message, _pendingRelayMessageMap);
   }
 
   private void setStateMessage(String resourceName, Partition partition, String instanceName,
@@ -221,15 +228,24 @@ public class CurrentStateOutput {
   }
 
   /**
-   * given (resource, partition, instance), returns toState
+   * given (resource, partition, instance), returns pending message on this instance.
    * @param resourceName
    * @param partition
    * @param instanceName
    * @return pending message
    */
-  // TODO: this should return toState, not pending message, create a separate method
-  public Message getPendingState(String resourceName, Partition partition, String instanceName) {
-    return getStateMessage(resourceName, partition, instanceName, _pendingStateMap);
+  public Message getPendingMessage(String resourceName, Partition partition, String instanceName) {
+    return getStateMessage(resourceName, partition, instanceName, _pendingMessageMap);
+  }
+
+  public Map<String, Message> getPendingRelayMessageMap(String resourceName, Partition partition) {
+    if (_pendingRelayMessageMap.containsKey(resourceName)) {
+      Map<Partition, Map<String, Message>> map = _pendingRelayMessageMap.get(resourceName);
+      if (map.containsKey(partition)) {
+        return map.get(partition);
+      }
+    }
+    return Collections.emptyMap();
   }
 
   /**
@@ -239,9 +255,9 @@ public class CurrentStateOutput {
    * @param instanceName
    * @return
    */
-  public Message getCancellationState(String resourceName, Partition partition,
+  public Message getCancellationMessage(String resourceName, Partition partition,
       String instanceName) {
-    return getStateMessage(resourceName, partition, instanceName, _cancellationStateMap);
+    return getStateMessage(resourceName, partition, instanceName, _cancellationMessageMap);
   }
 
   private Message getStateMessage(String resourceName, Partition partition, String instanceName,
@@ -291,8 +307,8 @@ public class CurrentStateOutput {
    * @return pending target state map
    */
   public Map<String, String> getPendingStateMap(String resourceName, Partition partition) {
-    if (_pendingStateMap.containsKey(resourceName)) {
-      Map<Partition, Map<String, Message>> map = _pendingStateMap.get(resourceName);
+    if (_pendingMessageMap.containsKey(resourceName)) {
+      Map<Partition, Map<String, Message>> map = _pendingMessageMap.get(resourceName);
       if (map.containsKey(partition)) {
         Map<String, Message> pendingMsgMap = map.get(partition);
         Map<String, String> pendingStateMap = new HashMap<String, String>();
@@ -312,8 +328,8 @@ public class CurrentStateOutput {
    * @return pending messages map
    */
   public Map<String, Message> getPendingMessageMap(String resourceName, Partition partition) {
-    if (_pendingStateMap.containsKey(resourceName)) {
-      Map<Partition, Map<String, Message>> map = _pendingStateMap.get(resourceName);
+    if (_pendingMessageMap.containsKey(resourceName)) {
+      Map<Partition, Map<String, Message>> map = _pendingMessageMap.get(resourceName);
       if (map.containsKey(partition)) {
         return map.get(partition);
       }
@@ -328,7 +344,7 @@ public class CurrentStateOutput {
    */
   public Set<Partition> getCurrentStateMappedPartitions(String resourceId) {
     Map<Partition, Map<String, String>> currentStateMap = _currentStateMap.get(resourceId);
-    Map<Partition, Map<String, Message>> pendingStateMap = _pendingStateMap.get(resourceId);
+    Map<Partition, Map<String, Message>> pendingStateMap = _pendingMessageMap.get(resourceId);
     Set<Partition> partitionSet = Sets.newHashSet();
     if (currentStateMap != null) {
       partitionSet.addAll(currentStateMap.keySet());
@@ -346,7 +362,7 @@ public class CurrentStateOutput {
    * @return set of participants to partitions mapping
    */
   public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) {
-    return getPartitionCountWithState(resourceStateModel, state, (Map) _pendingStateMap);
+    return getPartitionCountWithState(resourceStateModel, state, (Map) _pendingMessageMap);
   }
 
   /**
@@ -397,7 +413,7 @@ public class CurrentStateOutput {
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("current state= ").append(_currentStateMap);
-    sb.append(", pending state= ").append(_pendingStateMap);
+    sb.append(", pending state= ").append(_pendingMessageMap);
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index e21c607..3abc965 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -82,7 +82,7 @@ public class MessageGenerationPhase extends AbstractBaseStage {
     }
 
     Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    Map<String, String> sessionIdMap = new HashMap<String, String>();
+    Map<String, String> sessionIdMap = new HashMap<>();
 
     for (LiveInstance liveInstance : liveInstances.values()) {
       sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
@@ -101,7 +101,7 @@ public class MessageGenerationPhase extends AbstractBaseStage {
 
       for (Partition partition : resource.getPartitions()) {
 
-        Map<String, String> instanceStateMap = new HashMap<String, String>(
+        Map<String, String> instanceStateMap = new HashMap<>(
             intermediateStateOutput.getInstanceStateMap(resourceName, partition));
         Map<String, String> pendingStateMap =
             currentStateOutput.getPendingStateMap(resourceName, partition);
@@ -120,24 +120,27 @@ public class MessageGenerationPhase extends AbstractBaseStage {
         // we should generate message based on the desired-state priority
         // so keep generated messages in a temp map keyed by state
         // desired-state->list of generated-messages
-        Map<String, List<Message>> messageMap = new HashMap<String, List<Message>>();
+        Map<String, List<Message>> messageMap = new HashMap<>();
 
         for (String instanceName : instanceStateMap.keySet()) {
           String desiredState = instanceStateMap.get(instanceName);
 
-          String currentState = currentStateOutput.getCurrentState(resourceName, partition, instanceName);
+          String currentState = currentStateOutput.getCurrentState(resourceName, partition,
+              instanceName);
           if (currentState == null) {
             currentState = stateModelDef.getInitialState();
           }
 
-          Message pendingMessage = currentStateOutput.getPendingState(resourceName, partition, instanceName);
+          Message pendingMessage = currentStateOutput.getPendingMessage(resourceName, partition,
+              instanceName);
           boolean isCancellationEnabled = cache.getClusterConfig().isStateTransitionCancelEnabled();
-          Message cancellationMessage = currentStateOutput.getCancellationState(resourceName, partition, instanceName);
+          Message cancellationMessage = currentStateOutput.getCancellationMessage(resourceName,
+              partition, instanceName);
           String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
 
           Message message = null;
 
-          if (shouldCleanUpPendingMessage(pendingMessage, currentState,
+          if (pendingMessage != null && shouldCleanUpPendingMessage(pendingMessage, currentState,
               currentStateOutput.getEndTime(resourceName, partition, instanceName))) {
             LogUtil.logInfo(logger, _eventId, String.format(
                 "Adding pending message %s on instance %s to clean up. Msg: %s->%s, current state of resource %s:%s is %s",

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index a061598..03838f4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -20,11 +20,13 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.helix.controller.LogUtil;
@@ -60,6 +62,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
   public void process(ClusterEvent event) throws Exception {
     _eventId = event.getEventId();
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     CurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.name());
@@ -86,6 +89,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
         List<Message> selectedMessages = selectMessages(cache.getLiveInstances(),
             currentStateOutput.getCurrentStateMap(resourceName, partition),
             currentStateOutput.getPendingMessageMap(resourceName, partition), messages,
+            currentStateOutput.getPendingRelayMessageMap(resourceName, partition).values(),
             stateConstraints, stateTransitionPriorities, stateModelDef,
             resource.isP2PMessageEnabled());
         output.addMessages(resourceName, partition, selectedMessages);
@@ -127,9 +131,9 @@ public class MessageSelectionStage extends AbstractBaseStage {
    */
   List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
       Map<String, String> currentStates, Map<String, Message> pendingMessages,
-      List<Message> messages, Map<String, Bounds> stateConstraints,
-      final Map<String, Integer> stateTransitionPriorities, StateModelDefinition stateModelDef,
-      boolean p2pMessageEnabled) {
+      List<Message> messages, Collection<Message> pendingRelayMessages,
+      Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
+      StateModelDefinition stateModelDef, boolean p2pMessageEnabled) {
     if (messages == null || messages.isEmpty()) {
       return Collections.emptyList();
     }
@@ -188,6 +192,27 @@ public class MessageSelectionStage extends AbstractBaseStage {
     for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
       for (Message message : messageList) {
         String toState = message.getToState();
+        String fromState = message.getFromState();
+
+        if (toState.equals(stateModelDef.getTopState())) {
+          // find if there are any pending relay messages match this message.
+          // if yes, rebuild the message to use the same message id from the original relay message.
+          for (Message relayMsg : pendingRelayMessages) {
+            if (relayMsg.getToState().equals(toState) && relayMsg.getFromState()
+                .equals(fromState)) {
+              if (relayMsg.getTgtName().equals(message.getTgtName())) {
+                message = new Message(message, relayMsg.getMsgId());
+              } else {
+                // if there are pending relay message that was sent to a different host than the current message
+                // we should not send the toState message now.
+                LOG.info(
+                    "There is pending relay message to a different host, not send message: {}, pending relay message: {}",
+                    message, relayMsg);
+                continue;
+              }
+            }
+          }
+        }
 
         if (stateConstraints.containsKey(toState)) {
           int newCnt = (stateCnts.containsKey(toState) ? stateCnts.get(toState) + 1 : 1);

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index c196a26..6036f34 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -160,6 +160,11 @@ public class TaskAssignmentStage extends AbstractBaseStage {
       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
     }
 
-    dataAccessor.createChildren(keys, new ArrayList<>(messages));
+    boolean[] results = dataAccessor.createChildren(keys, new ArrayList<>(messages));
+    for (int i = 0; i < results.length; i++) {
+      if (!results[i]) {
+        LogUtil.logWarn(logger, _eventId, "Failed to send message: " + keys.get(i));
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 9734cc8..925f127 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -488,8 +488,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
           return true;
         } else {
           _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
-              "fail to cancel task: " + taskId,
-              notificationContext.getManager());
+              "fail to cancel task: " + taskId, notificationContext.getManager());
         }
       } else {
         _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
@@ -789,14 +788,6 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         continue;
       }
 
-      if (message.isExpired()) {
-        LOG.info(
-            "Dropping expired message. mid: " + message.getId() + ", from: " + message.getMsgSrc() + " relayed from: "
-                + message.getRelaySrcHost());
-        reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
-        continue;
-      }
-
       String tgtSessionId = message.getTgtSessionId();
       // sessionId mismatch normally means message comes from expired session, just remove it
       if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
@@ -843,6 +834,14 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         continue;
       }
 
+      if (message.isExpired()) {
+        LOG.info(
+            "Dropping expired message. mid: " + message.getId() + ", from: " + message.getMsgSrc()
+                + " relayed from: " + message.getRelaySrcHost());
+        reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
+        continue;
+      }
+
       // State Transition Cancellation
       if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
         boolean success = cancelNotStartedStateTransition(message, stateTransitionHandlers, accessor, instanceName);
@@ -874,6 +873,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
                 duplicatedMessage.getToState(), message.getFromState(), message.getToState()));
           } else if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
               && isStateTransitionInProgress(messageTarget)) {
+
             // If there is another state transition for same partition is going on,
             // discard the message. Controller will resend if this is a valid message
             throw new HelixException(String
@@ -1095,7 +1095,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         && stateTranstionMessage.getToState().equalsIgnoreCase(cancellationMessage.getToState());
   }
 
-  private String getMessageTarget(String resourceName, String partitionName) {
+  String getMessageTarget(String resourceName, String partitionName) {
     return String.format("%s_%s", resourceName, partitionName);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 8195092..51d03cb 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -178,6 +178,16 @@ public class Message extends HelixProperty {
   }
 
   /**
+   * Instantiate a message with a new id
+   * @param message message to be copied
+   * @param id unique message identifier
+   */
+  public Message(Message message, String id) {
+    super(new ZNRecord(message.getRecord(), id));
+    setMsgId(id);
+  }
+
+  /**
    * Set a subtype of the message
    * @param subType name of the subtype
    */
@@ -820,7 +830,7 @@ public class Message extends HelixProperty {
     // use relay time if this is a relay message
     if (isRelayMessage()) {
       long relayTime = getRelayTime();
-      return relayTime <= 0 || (relayTime + expiry < current);
+      return (relayTime > 0 && (relayTime + expiry < current));
     }
 
     return getCreateTimeStamp() + expiry < current;

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index eb67d59..e230fb5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -65,7 +65,7 @@ public abstract class AbstractTaskDispatcher {
 
         // Check for pending state transitions on this (partition, instance).
         Message pendingMessage =
-            currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
+            currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
         if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) {
           processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, pId, pName, instance,
               pendingMessage, jobState, currState, paMap, assignedPartitions);

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 97e02fc..9903117 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -366,7 +366,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
 
         // Check for pending state transitions on this (partition, instance).
         Message pendingMessage =
-            currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
+            currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
         if (pendingMessage != null) {
           // There is a pending state transition for this (partition, instance). Just copy forward
           // the state assignment from the previous ideal state.

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 87b57c7..4389484 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -156,7 +156,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
       int pId = partitions.get(0);
       if (includeSet.contains(pId)) {
         for (String instance : instances) {
-          Message pendingMessage = currStateOutput.getPendingState(tgtIs.getResourceName(),
+          Message pendingMessage = currStateOutput.getPendingMessage(tgtIs.getResourceName(),
               new Partition(pName), instance);
           if (pendingMessage != null) {
             continue;
@@ -230,7 +230,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
           // If there is, we should wait until the pending message gets processed, so skip
           // assignment this time around
           Message pendingMessage =
-              currStateOutput.getPendingState(targetIdealState.getResourceName(),
+              currStateOutput.getPendingMessage(targetIdealState.getResourceName(),
                   new Partition(targetResourcePartitionName), instance);
           if (pendingMessage != null) {
             continue;

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index ddda41a..5b29c23 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -270,7 +270,7 @@ public class JobRebalancer extends TaskRebalancer {
           paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.TASK_ABORTED.name()));
         }
         Partition partition = new Partition(pName(jobResource, pId));
-        Message pendingMessage = currStateOutput.getPendingState(jobResource, partition, instance);
+        Message pendingMessage = currStateOutput.getPendingMessage(jobResource, partition, instance);
         // While job is failing, if the task is pending on INIT->RUNNING, set it back to INIT,
         // so that Helix will cancel the transition.
         if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT && pendingMessage != null) {
@@ -337,7 +337,8 @@ public class JobRebalancer extends TaskRebalancer {
       TaskPartitionState state = jobContext.getPartitionState(pId);
       Partition partition = new Partition(pName(jobResource, pId));
       String instance = jobContext.getAssignedParticipant(pId);
-      Message pendingMessage = currentStateOutput.getPendingState(jobResource, partition, instance);
+      Message pendingMessage = currentStateOutput.getPendingMessage(jobResource, partition,
+          instance);
       // If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished
       if (state == TaskPartitionState.RUNNING
           || (state == TaskPartitionState.INIT && pendingMessage != null)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index 9d8e63d..dbf9272 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit;
 public abstract class ZkHelixClusterVerifier
     implements IZkChildListener, IZkDataListener, HelixClusterVerifier {
   private static Logger LOG = LoggerFactory.getLogger(ZkHelixClusterVerifier.class);
-  protected static int DEFAULT_TIMEOUT = 30 * 1000;
+  protected static int DEFAULT_TIMEOUT = 300 * 1000;
   protected static int DEFAULT_PERIOD = 100;
 
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 189e95c..c69744e 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -42,6 +42,7 @@ import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.Stage;
@@ -65,6 +66,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.tools.ClusterSetup;
@@ -240,6 +242,38 @@ public class ZkTestBase {
     configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig);
   }
 
+  protected void enableP2PInCluster(String clusterName, ConfigAccessor configAccessor,
+      boolean enable) {
+    // enable p2p message in cluster.
+    if (enable) {
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+      clusterConfig.enableP2PMessage(true);
+      configAccessor.setClusterConfig(clusterName, clusterConfig);
+    } else {
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+      clusterConfig.getRecord().getSimpleFields()
+          .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
+      configAccessor.setClusterConfig(clusterName, clusterConfig);
+    }
+  }
+
+  protected void enableP2PInResource(String clusterName, ConfigAccessor configAccessor,
+      String dbName, boolean enable) {
+    if (enable) {
+      ResourceConfig resourceConfig =
+          new ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build();
+      configAccessor.setResourceConfig(clusterName, dbName, resourceConfig);
+    } else {
+      // remove P2P Message in resource config
+      ResourceConfig resourceConfig = configAccessor.getResourceConfig(clusterName, dbName);
+      if (resourceConfig != null) {
+        resourceConfig.getRecord().getSimpleFields()
+            .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
+        configAccessor.setResourceConfig(clusterName, dbName, resourceConfig);
+      }
+    }
+  }
+
   protected void setDelayTimeInCluster(ZkClient zkClient, String clusterName, long delay) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 4479cf6..06fbf24 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -78,7 +78,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     runStage(event, stage);
     CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.name());
     String pendingState =
-        output2.getPendingState("testResourceName", new Partition("testResourceName_1"),
+        output2.getPendingMessage("testResourceName", new Partition("testResourceName_1"),
             "localhost_3").getToState();
     AssertJUnit.assertEquals(pendingState, "SLAVE");
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
index 79e4e2c..45e1062 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -85,7 +86,7 @@ public class TestMsgSelectionStage {
 
     List<Message> selectedMsg =
         new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingMessages,
-            messages, stateConstraints, stateTransitionPriorities,
+            messages, Collections.<Message>emptyList(), stateConstraints, stateTransitionPriorities,
             BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(), false);
 
     Assert.assertEquals(selectedMsg.size(), 1);
@@ -123,7 +124,7 @@ public class TestMsgSelectionStage {
 
     List<Message> selectedMsg =
         new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingMessages,
-            messages, stateConstraints, stateTransitionPriorities,
+            messages, Collections.<Message>emptyList(), stateConstraints, stateTransitionPriorities,
             BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(), false);
 
     Assert.assertEquals(selectedMsg.size(), 0);

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
index 551ea78..3c95c2f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.integration.DelayedTransitionBase;
@@ -33,11 +32,9 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
-import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.MasterSlaveSMD;
-import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
@@ -55,7 +52,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
   static final String DB_NAME_1 = "TestDB_1";
   static final String DB_NAME_2 = "TestDB_2";
 
-  static final int PARTITION_NUMBER = 20;
+  static final int PARTITION_NUMBER = 200;
   static final int REPLICA_NUMBER = 3;
 
   List<MockParticipantManager> _participants = new ArrayList<>();
@@ -126,23 +123,25 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false);
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
-    verifyP2PMessage(DB_NAME_1,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
-    verifyP2PMessage(DB_NAME_2,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+    verifyP2PMessage(DB_NAME_1,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1);
+    verifyP2PMessage(DB_NAME_2,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1);
 
 
     //re-enable the old master
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
-    verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+    verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(),
+        _controller.getInstanceName(), 1);
+    verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(),
+        _controller.getInstanceName(), 1);
   }
 
   @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
   public void testP2PStateTransitionEnabledInCluster() {
-    enableP2PInCluster(true);
-    enableP2PInResource(DB_NAME_1,false);
-    enableP2PInResource(DB_NAME_2,false);
+    enableP2PInCluster(CLUSTER_NAME, _configAccessor, true);
+    enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_1,false);
+    enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_2,false);
 
     // disable the master instance
     String prevMasterInstance = _instances.get(0);
@@ -156,15 +155,17 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(1));
-    verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(1));
+    verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(
+        1));
+    verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(
+        1));
   }
 
   @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
   public void testP2PStateTransitionEnabledInResource() {
-    enableP2PInCluster(false);
-    enableP2PInResource(DB_NAME_1,true);
-    enableP2PInResource(DB_NAME_2,false);
+    enableP2PInCluster(CLUSTER_NAME, _configAccessor, false);
+    enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_1,true);
+    enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_2,false);
 
 
     // disable the master instance
@@ -173,7 +174,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     verifyP2PMessage(DB_NAME_1, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), prevMasterInstance);
-    verifyP2PMessage(DB_NAME_2, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+    verifyP2PMessage(DB_NAME_2, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1);
 
 
     //re-enable the old master
@@ -181,37 +182,14 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(1));
-    verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
-  }
-
-  private void enableP2PInCluster(boolean enable) {
-    // enable p2p message in cluster.
-    if (enable) {
-      ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
-      clusterConfig.enableP2PMessage(true);
-      _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
-    } else {
-      ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
-      clusterConfig.getRecord().getSimpleFields().remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
-      _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
-    }
+    verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1);
   }
 
-  private void enableP2PInResource(String dbName, boolean enable) {
-    if (enable) {
-      ResourceConfig resourceConfig = new ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build();
-      _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig);
-    } else {
-      // remove P2P Message in resource config
-      ResourceConfig resourceConfig = _configAccessor.getResourceConfig(CLUSTER_NAME, dbName);
-      if (resourceConfig != null) {
-        resourceConfig.getRecord().getSimpleFields().remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
-        _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig);
-      }
-    }
+  private void verifyP2PMessage(String dbName, String instance, String expectedState, String expectedTriggerHost) {
+    verifyP2PMessage(dbName, instance, expectedState, expectedTriggerHost, 0.7);
   }
 
-  private void verifyP2PMessage(String dbName, String instance, String expectedState, String expectedTriggerHost) {
+  private void verifyP2PMessage(String dbName, String instance, String expectedState, String expectedTriggerHost, double expectedRatio) {
     ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME);
     dataCache.refresh(_accessor);
 
@@ -239,7 +217,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
     }
 
     double ratio = ((double) expectedHost) / ((double) total);
-    Assert.assertTrue(ratio >= 0.7, String
+    Assert.assertTrue(ratio >= expectedRatio, String
         .format("Only %d out of %d percent transitions to Master were triggered by expected host!",
             expectedHost, total));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
new file mode 100644
index 0000000..b3ef3e5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -0,0 +1,315 @@
+package org.apache.helix.integration.messaging;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.DelayedTransitionBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MockHelixTaskExecutor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestP2PNoDuplicatedMessage extends ZkTestBase {
+  private static Logger logger = LoggerFactory.getLogger(TestP2PNoDuplicatedMessage.class);
+
+  final String CLASS_NAME = getShortClassName();
+  final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  static final int PARTICIPANT_NUMBER = 6;
+  static final int PARTICIPANT_START_PORT = 12918;
+
+  static final int DB_COUNT = 2;
+
+  static final int PARTITION_NUMBER = 50;
+  static final int REPLICA_NUMBER = 3;
+
+  final String _controllerName = CONTROLLER_PREFIX + "_0";
+
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  List<String> _instances = new ArrayList<>();
+  ClusterControllerManager _controller;
+
+  ZkHelixClusterVerifier _clusterVerifier;
+  ConfigAccessor _configAccessor;
+  HelixDataAccessor _accessor;
+
+  @BeforeClass
+  public void beforeClass()
+      throws InterruptedException {
+    System.out.println(
+        "START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+      _instances.add(instance);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      MockParticipantManager participant =
+          new TestParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
+      participant.setTransition(new DelayedTransitionBase(100));
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    for (int i = 0; i < DB_COUNT; i++) {
+      createResourceWithDelayedRebalance(CLUSTER_NAME, "TestDB_" + i,
+          BuiltInStateModelDefinitions.MasterSlave.name(), PARTITION_NUMBER, REPLICA_NUMBER,
+          REPLICA_NUMBER - 1, 1000000L, CrushEdRebalanceStrategy.class.getName());
+    }
+
+    // start controller
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, _controllerName);
+    _controller.syncStart();
+
+    _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    _configAccessor = new ConfigAccessor(_gZkClient);
+    _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    for (MockParticipantManager p : _participants) {
+      if (p.isConnected()) {
+        p.syncStop();
+      }
+    }
+    deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testP2PStateTransitionDisabled() throws InterruptedException {
+    enableP2PInCluster(CLUSTER_NAME, _configAccessor, false);
+
+    MockHelixTaskExecutor.resetStats();
+    // rolling upgrade the cluster
+    for (String ins : _instances) {
+      _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, false);
+      Assert.assertTrue(_clusterVerifier.verifyByPolling());
+      verifyP2PDisabled();
+
+      _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, true);
+      Assert.assertTrue(_clusterVerifier.verifyByPolling());
+      verifyP2PDisabled();
+    }
+
+    Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
+        "There are duplicated transition messages sent while participant is handling the state-transition!");
+    Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0,
+        "There are duplicated transition messages sent at same time!");
+  }
+
+  @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
+  public void testP2PStateTransitionEnabled() throws InterruptedException {
+    enableP2PInCluster(CLUSTER_NAME, _configAccessor, true);
+    long startTime = System.currentTimeMillis();
+    MockHelixTaskExecutor.resetStats();
+    // rolling upgrade the cluster
+    for (String ins : _instances) {
+      _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, false);
+      Assert.assertTrue(_clusterVerifier.verifyByPolling());
+      verifyP2PEnabled(startTime);
+
+      _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, true);
+      Assert.assertTrue(_clusterVerifier.verifyByPolling());
+      verifyP2PEnabled(startTime);
+    }
+
+    double ratio = ((double) p2pTrigged) / ((double) total);
+    Assert.assertTrue(ratio > 0.7, String
+        .format("Only %d out of %d percent transitions to Master were triggered by expected host!",
+            p2pTrigged, total));
+
+    Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
+        "There are duplicated transition messages sent while participant is handling the state-transition!");
+    Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0,
+        "There are duplicated transition messages sent at same time!");
+  }
+
+  private void verifyP2PDisabled() {
+    ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME);
+    dataCache.refresh(_accessor);
+    Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+
+    for (LiveInstance instance : liveInstanceMap.values()) {
+      Map<String, CurrentState> currentStateMap =
+          dataCache.getCurrentState(instance.getInstanceName(), instance.getSessionId());
+      Assert.assertNotNull(currentStateMap);
+      for (CurrentState currentState : currentStateMap.values()) {
+        for (String partition : currentState.getPartitionStateMap().keySet()) {
+          String state = currentState.getState(partition);
+          if (state.equalsIgnoreCase("MASTER")) {
+            String triggerHost = currentState.getTriggerHost(partition);
+            Assert.assertEquals(triggerHost, _controllerName,
+                state + " of " + partition + " on " + instance.getInstanceName()
+                    + " was triggered by " + triggerHost);
+          }
+        }
+      }
+    }
+  }
+
+  static int total = 0;
+  static int p2pTrigged = 0;
+
+  private void verifyP2PEnabled(long startTime) {
+    ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME);
+    dataCache.refresh(_accessor);
+    Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+
+    for (LiveInstance instance : liveInstanceMap.values()) {
+      Map<String, CurrentState> currentStateMap =
+          dataCache.getCurrentState(instance.getInstanceName(), instance.getSessionId());
+      Assert.assertNotNull(currentStateMap);
+      for (CurrentState currentState : currentStateMap.values()) {
+        for (String partition : currentState.getPartitionStateMap().keySet()) {
+          String state = currentState.getState(partition);
+          long start = currentState.getStartTime(partition);
+          if (state.equalsIgnoreCase("MASTER") && start > startTime) {
+            String triggerHost = currentState.getTriggerHost(partition);
+            if (!triggerHost.equals(_controllerName)) {
+              p2pTrigged ++;
+            }
+            total ++;
+          }
+        }
+      }
+    }
+  }
+
+
+  static class TestParticipantManager extends MockParticipantManager {
+    private final DefaultMessagingService _messagingService;
+
+    public TestParticipantManager(String zkAddr, String clusterName, String instanceName) {
+      super(zkAddr, clusterName, instanceName);
+      _messagingService = new MockMessagingService(this);
+    }
+
+    @Override
+    public ClusterMessagingService getMessagingService() {
+      // The caller can register message handler factories on messaging service before the
+      // helix manager is connected. Thus we do not do connected check here.
+      return _messagingService;
+    }
+  }
+
+  static class MockMessagingService extends DefaultMessagingService {
+    private final HelixTaskExecutor _taskExecutor;
+    ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded =
+        new ConcurrentHashMap<>();
+    private final HelixManager _manager;
+
+    public MockMessagingService(HelixManager manager) {
+      super(manager);
+      _manager = manager;
+
+      boolean isParticipant = false;
+      if (manager.getInstanceType() == InstanceType.PARTICIPANT
+          || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
+        isParticipant = true;
+      }
+
+      _taskExecutor = new MockHelixTaskExecutor(
+          new ParticipantStatusMonitor(isParticipant, manager.getInstanceName()),
+          new MessageQueueMonitor(manager.getClusterName(), manager.getInstanceName()));
+    }
+
+    @Override
+    public synchronized void registerMessageHandlerFactory(String type,
+        MessageHandlerFactory factory) {
+      registerMessageHandlerFactory(Collections.singletonList(type), factory);
+    }
+
+    @Override
+    public synchronized void registerMessageHandlerFactory(List<String> types,
+        MessageHandlerFactory factory) {
+      if (_manager.isConnected()) {
+        for (String type : types) {
+          registerMessageHandlerFactoryExtended(type, factory);
+        }
+      } else {
+        for (String type : types) {
+          _messageHandlerFactoriestobeAdded.put(type, factory);
+        }
+      }
+    }
+
+    public synchronized void onConnected() {
+      for (String type : _messageHandlerFactoriestobeAdded.keySet()) {
+        registerMessageHandlerFactoryExtended(type, _messageHandlerFactoriestobeAdded.get(type));
+      }
+      _messageHandlerFactoriestobeAdded.clear();
+    }
+
+    public HelixTaskExecutor getExecutor() {
+      return _taskExecutor;
+    }
+
+
+    void registerMessageHandlerFactoryExtended(String type, MessageHandlerFactory factory) {
+      int threadpoolSize = HelixTaskExecutor.DEFAULT_PARALLEL_TASKS;
+      _taskExecutor.registerMessageHandlerFactory(type, factory, threadpoolSize);
+      super.sendNopMessage();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
index 4083988..83893c6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
@@ -98,8 +98,6 @@ public class TestExpandCluster extends TestPartitionMigrationBase {
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     Assert.assertFalse(_migrationVerifier.hasLessReplica());
-    Assert.assertFalse(_migrationVerifier.hasMoreReplica());
-
     _migrationVerifier.stop();
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
new file mode 100644
index 0000000..207f74c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
@@ -0,0 +1,111 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
+
+public class MockHelixTaskExecutor extends HelixTaskExecutor {
+  public static int duplicatedMessages = 0;
+  public static int extraStateTransition = 0;
+  public static int duplicatedMessagesInProgress = 0;
+  HelixManager manager;
+
+  public MockHelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor,
+      MessageQueueMonitor messageQueueMonitor) {
+    super(participantStatusMonitor, messageQueueMonitor);
+  }
+
+  @Override
+  public void onMessage(String instanceName, List<Message> messages,
+      NotificationContext changeContext) {
+    manager = changeContext.getManager();
+    checkDuplicatedMessages(messages);
+    super.onMessage(instanceName, messages, changeContext);
+  }
+
+  void checkDuplicatedMessages(List<Message> messages) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey path = keyBuilder.currentStates(manager.getInstanceName(), manager.getSessionId());
+    Map<String, CurrentState> currentStateMap = accessor.getChildValuesMap(path);
+
+    Set<String> seenPartitions = new HashSet<>();
+    for (Message message : messages) {
+      if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
+        String resource = message.getResourceName();
+        String partition = message.getPartitionName();
+
+        //System.err.println(message.getMsgId());
+        String key = resource + "-" + partition;
+        if (seenPartitions.contains(key)) {
+          //System.err.println("Duplicated message received for " + resource + ":" + partition);
+          duplicatedMessages++;
+        }
+        seenPartitions.add(key);
+
+        String toState = message.getToState();
+        String state = null;
+        if (currentStateMap.containsKey(resource)) {
+          CurrentState currentState = currentStateMap.get(resource);
+          state = currentState.getState(partition);
+        }
+
+        if (toState.equals(state) && message.getMsgState() == Message.MessageState.NEW) {
+          //            logger.error(
+          //                "Extra message: " + message.getMsgId() + ", Partition is already in target state "
+          //                    + toState + " for " + resource + ":" + partition);
+          extraStateTransition++;
+        }
+
+        String messageTarget =
+            getMessageTarget(message.getResourceName(), message.getPartitionName());
+
+        if (message.getMsgState() == Message.MessageState.NEW &&
+            _messageTaskMap.containsKey(messageTarget)) {
+          String taskId = _messageTaskMap.get(messageTarget);
+          MessageTaskInfo messageTaskInfo = _taskMap.get(taskId);
+          Message existingMsg = messageTaskInfo.getTask().getMessage();
+          if (existingMsg.getMsgId() != message.getMsgId())
+            //            logger.error("Duplicated message In Progress: " + message.getMsgId()
+            //                    + ", state transition in progress with message " + existingMsg.getMsgId()
+            //                    + " to " + toState + " for " + resource + ":" + partition);
+            duplicatedMessagesInProgress ++;
+        }
+      }
+    }
+  }
+
+  public static void resetStats() {
+    duplicatedMessages = 0;
+    extraStateTransition = 0;
+    duplicatedMessagesInProgress = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index d5e40be..bc1e554 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -32,6 +32,11 @@ import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.IntermediateStateOutput;
 import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageSelectionStageOutput;
@@ -45,6 +50,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
@@ -156,7 +162,8 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     // Validate: Controller should not send S->M message to new master.
 
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
-    currentStateOutput.setPendingState(_db, _partition, initialMaster, toSlaveMessage);
+    currentStateOutput.setPendingMessage(_db, _partition, initialMaster, toSlaveMessage);
+    currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
 
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
@@ -175,7 +182,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     currentStateOutput =
         populateCurrentStateFromBestPossible(_bestpossibleState);
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
-    currentStateOutput.setPendingState(_db, _partition, secondMaster, relayMessage);
+    currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     _fullPipeline.handle(event);
@@ -221,7 +228,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     // The initial master has forwarded the p2p message to secondMaster and deleted original M->S message on initialMaster,
     // But the S->M state-transition has not completed yet in secondMaster.
     // Validate: Controller should not send S->M to thirdMaster.
-    currentStateOutput.setPendingState(_db, _partition, secondMaster, relayMessage);
+    currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);


[3/3] helix git commit: Temorary workaround to fix P2P race-condition for old helix participant (0.8.1 or older).

Posted by lx...@apache.org.
Temorary workaround to fix P2P race-condition for old helix participant (0.8.1 or older).


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/74145e8a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/74145e8a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/74145e8a

Branch: refs/heads/master
Commit: 74145e8ad3b34753186d53526bab825de4432c31
Parents: 880f885
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Jul 27 17:28:17 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 17 15:17:26 2018 -0700

----------------------------------------------------------------------
 .../common/caches/InstanceMessagesCache.java    | 98 ++++++++++++++++++--
 .../stages/CurrentStateComputationStage.java    |  9 +-
 .../controller/stages/TaskAssignmentStage.java  | 18 +++-
 .../messaging/handling/HelixTaskExecutor.java   | 12 ++-
 .../messaging/TestP2PNoDuplicatedMessage.java   | 13 ++-
 5 files changed, 120 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index 13b77cc..9fa9136 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -34,6 +34,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,10 +54,22 @@ public class InstanceMessagesCache {
   // <instance -> {<MessageId, Message>}>
   private Map<String, Map<String, Message>> _relayMessageCache = Maps.newHashMap();
 
+
+  // TODO: Temporary workaround to void participant receiving duplicated state transition messages when p2p is enable.
+  // should remove this once all clients are migrated to 0.8.2. -- Lei
+  private Map<String, Message> _committedRelayMessages = Maps.newHashMap();
+
+  public static final String COMMIT_MESSAGE_EXPIRY_CONFIG = "helix.controller.messagecache.commitmessageexpiry";
+
+  private static final int DEFAULT_COMMIT_RELAY_MESSAGE_EXPIRY = 20 * 1000; // 20 seconds
+  private final int _commitMessageExpiry;
+
   private String _clusterName;
 
   public InstanceMessagesCache(String clusterName) {
     _clusterName = clusterName;
+    _commitMessageExpiry = HelixUtil
+        .getSystemPropertyAsInt(COMMIT_MESSAGE_EXPIRY_CONFIG, DEFAULT_COMMIT_RELAY_MESSAGE_EXPIRY);
   }
 
   /**
@@ -203,14 +216,27 @@ public class InstanceMessagesCache {
         String targetState = message.getToState();
         String instanceSessionId = liveInstanceMap.get(instance).getSessionId();
 
-        if (_messageMap.get(instance).containsKey(message.getMsgId())) {
-          // relay message has already been sent to target host
-          // remove the message from relayMessageCache.
-          LOG.info(
-              "Relay message has already been sent to target host, remove relay message from the cache"
-                  + message.getId());
-          iterator.remove();
-          continue;
+        Map<String, Message> instanceMsgMap = _messageMap.get(instance);
+
+        if (instanceMsgMap != null && instanceMsgMap.containsKey(message.getMsgId())) {
+          Message commitMessage = instanceMsgMap.get(message.getMsgId());
+
+          if (!commitMessage.isRelayMessage()) {
+            LOG.info(
+                "Controller already sent the message to the target host, remove relay message from the cache"
+                    + message.getId());
+            iterator.remove();
+            _committedRelayMessages.remove(message.getMsgId());
+            continue;
+          } else {
+            // relay message has already been sent to target host
+            // remember when the relay messages get relayed to the target host.
+            if (!_committedRelayMessages.containsKey(message.getMsgId())) {
+              message.setRelayTime(System.currentTimeMillis());
+              _committedRelayMessages.put(message.getMsgId(), message);
+              LOG.info("Put message into committed relay messages " + message.getId());
+            }
+          }
         }
 
         if (!instanceSessionId.equals(sessionId)) {
@@ -227,6 +253,7 @@ public class InstanceMessagesCache {
               .getId());
           continue;
         }
+
         CurrentState currentState = sessionCurrentStateMap.get(resourceName);
         if (currentState != null && targetState.equals(currentState.getState(partitionName))) {
           LOG.info("CurrentState " + currentState
@@ -237,8 +264,8 @@ public class InstanceMessagesCache {
         }
 
         if (message.isExpired()) {
-          LOG.info("relay message " + message.getId() + " expired, remove it from cache."
-              + message.getId());
+          LOG.error("relay message has not been sent " + message.getId()
+              + " expired, remove it from cache. relay time: " + message.getRelayTime());
           iterator.remove();
           continue;
         }
@@ -251,6 +278,55 @@ public class InstanceMessagesCache {
     }
 
     _relayMessageMap = Collections.unmodifiableMap(relayMessageMap);
+
+    // TODO: this is a workaround, remove this once the participants are all in 0.8.2,
+    checkCommittedRelayMessages(currentStateMap);
+
+  }
+
+  // TODO: this is a workaround, once the participants are all in 0.8.2,
+  private void checkCommittedRelayMessages(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
+    Iterator<Map.Entry<String, Message>> it = _committedRelayMessages.entrySet().iterator();
+    while (it.hasNext()) {
+      Message message = it.next().getValue();
+
+      String resourceName = message.getResourceName();
+      String partitionName = message.getPartitionName();
+      String targetState = message.getToState();
+      String instance = message.getTgtName();
+      String sessionId = message.getTgtSessionId();
+
+      long committedTime = message.getRelayTime();
+      if (committedTime + _commitMessageExpiry < System.currentTimeMillis()) {
+        LOG.info("relay message " + message.getMsgId()
+            + " is expired after committed, remove it from committed message cache.");
+        it.remove();
+        continue;
+      }
+
+      Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
+          currentStateMap.get(instance);
+      if (instanceCurrentStateMap == null || instanceCurrentStateMap.get(sessionId) == null) {
+        LOG.info(
+            "No sessionCurrentStateMap found, remove it from committed message cache." + message
+                .getId());
+        it.remove();
+        continue;
+      }
+
+      Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId);
+      CurrentState currentState = sessionCurrentStateMap.get(resourceName);
+      if (currentState != null && targetState.equals(currentState.getState(partitionName))) {
+        LOG.info("CurrentState " + currentState
+            + " match the target state of the relay message, remove it from committed message cache."
+            + message.getId());
+        it.remove();
+        continue;
+      }
+
+      Map<String, Message> cachedMap = _messageMap.get(message.getTgtName());
+      cachedMap.put(message.getId(), message);
+    }
   }
 
   /**
@@ -303,6 +379,8 @@ public class InstanceMessagesCache {
       _relayMessageCache.put(instanceName, Maps.<String, Message>newHashMap());
     }
     _relayMessageCache.get(instanceName).put(message.getId(), message);
+
+    LOG.info("Add message to relay cache " + message.getMsgId());
   }
 
   @Override public String toString() {

http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 9cc9506..340a051 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -164,8 +164,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
               message.getMsgId(), resourceName, message.getPartitionName()));
         }
       } else {
-        LogUtil.logWarn(LOG, _eventId, String
-            .format("A relay message %s should not be batched, ignored!", message.getMsgId()));
+        LogUtil.logWarn(LOG, _eventId,
+            String.format("A relay message %s should not be batched, ignored!", message.getMsgId()));
       }
     }
   }
@@ -275,8 +275,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
     // Check whether it is already passed threshold
     for (String resourceName : missingTopStateMap.keySet()) {
       for (String partitionName : missingTopStateMap.get(resourceName).keySet()) {
-        long startTime = missingTopStateMap.get(resourceName).get(partitionName);
-        if (startTime > 0 && System.currentTimeMillis() - startTime > durationThreshold) {
+        Long startTime = missingTopStateMap.get(resourceName).get(partitionName);
+        if (startTime != null && startTime > 0
+            && System.currentTimeMillis() - startTime > durationThreshold) {
           missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED);
           if (clusterStatusMonitor != null) {
             clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false);

http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 6036f34..6d483a0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -74,7 +74,8 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     List<Message> outputMessages =
         batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveInstanceMap,
             manager.getProperties());
-    sendMessages(dataAccessor, outputMessages);
+
+    List<Message> messagesSent = sendMessages(dataAccessor, outputMessages);
     // TODO: Need also count messages from task rebalancer
     if (!cache.isTaskCache()) {
       ClusterStatusMonitor clusterStatusMonitor =
@@ -84,7 +85,7 @@ public class TaskAssignmentStage extends AbstractBaseStage {
       }
     }
     long cacheStart = System.currentTimeMillis();
-    cache.cacheMessages(outputMessages);
+    cache.cacheMessages(messagesSent);
     long cacheEnd = System.currentTimeMillis();
     LogUtil.logDebug(logger, _eventId, "Caching messages took " + (cacheEnd - cacheStart) + " ms");
   }
@@ -132,9 +133,11 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     return outputMessages;
   }
 
-  protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) {
+  // return the messages actually sent
+  protected List<Message> sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) {
+    List<Message> messageSent = new ArrayList<>();
     if (messages == null || messages.isEmpty()) {
-      return;
+      return messageSent;
     }
 
     Builder keyBuilder = dataAccessor.keyBuilder();
@@ -146,6 +149,7 @@ public class TaskAssignmentStage extends AbstractBaseStage {
               + message.getResourceName() + "." + message.getPartitionName() + "|" + message
               .getPartitionNames() + " from:" + message.getFromState() + " to:" + message
               .getToState() + ", relayMessages: " + message.getRelayMessages().size());
+
       if (message.hasRelayMessages()) {
         for (Message msg : message.getRelayMessages().values()) {
           LogUtil.logInfo(logger, _eventId,
@@ -163,8 +167,12 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     boolean[] results = dataAccessor.createChildren(keys, new ArrayList<>(messages));
     for (int i = 0; i < results.length; i++) {
       if (!results[i]) {
-        LogUtil.logWarn(logger, _eventId, "Failed to send message: " + keys.get(i));
+        LogUtil.logError(logger, _eventId, "Failed to send message: " + keys.get(i));
+      } else {
+        messageSent.add(messages.get(i));
       }
     }
+
+    return messageSent;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 925f127..5e2082c 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -874,12 +874,16 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
           } else if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
               && isStateTransitionInProgress(messageTarget)) {
 
+            String taskId = _messageTaskMap.get(messageTarget);
+            Message msg = _taskMap.get(taskId).getTask().getMessage();
+
             // If there is another state transition for same partition is going on,
             // discard the message. Controller will resend if this is a valid message
-            throw new HelixException(String
-                .format("Another state transition for %s:%s is in progress. Discarding %s->%s message",
-                    message.getResourceName(), message.getPartitionName(), message.getFromState(),
-                    message.getToState()));
+            throw new HelixException(String.format(
+                "Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
+                message.getResourceName(), message.getPartitionName(), msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
+                msg.getReadTimeStamp(), System.currentTimeMillis(), message.getFromState(),
+                message.getToState()));
           }
 
           stateTransitionHandlers

http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
index b3ef3e5..bf7f566 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -80,8 +80,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
   HelixDataAccessor _accessor;
 
   @BeforeClass
-  public void beforeClass()
-      throws InterruptedException {
+  public void beforeClass() {
     System.out.println(
         "START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
 
@@ -136,7 +135,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
   }
 
   @Test
-  public void testP2PStateTransitionDisabled() throws InterruptedException {
+  public void testP2PStateTransitionDisabled() {
     enableP2PInCluster(CLUSTER_NAME, _configAccessor, false);
 
     MockHelixTaskExecutor.resetStats();
@@ -158,7 +157,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
   }
 
   @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
-  public void testP2PStateTransitionEnabled() throws InterruptedException {
+  public void testP2PStateTransitionEnabled() {
     enableP2PInCluster(CLUSTER_NAME, _configAccessor, true);
     long startTime = System.currentTimeMillis();
     MockHelixTaskExecutor.resetStats();
@@ -174,9 +173,9 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
     }
 
     double ratio = ((double) p2pTrigged) / ((double) total);
-    Assert.assertTrue(ratio > 0.7, String
-        .format("Only %d out of %d percent transitions to Master were triggered by expected host!",
-            p2pTrigged, total));
+    Assert.assertTrue(ratio > 0.6, String
+       .format("Only %d out of %d percent transitions to Master were triggered by expected host!",
+           p2pTrigged, total));
 
     Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
         "There are duplicated transition messages sent while participant is handling the state-transition!");