You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/06/24 23:49:22 UTC

[helix] branch master updated (5b5f95e -> 7ac3a6a)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git.


    from 5b5f95e  Disable JavaDoc check in pom.xml
     new cc14cfd  Introduce ZkPathStatRecord to record watch reinstall in ZkClient.
     new 1125870  Adding Zk data change callback propagation latency metric.
     new 9707b38  Upgrade ZK to 3.4.13
     new 1543896  Change state transition monitor to per cluster per state transition
     new 831593c  Remove workaround in sending S->M message when there is a same pending relay message.
     new 8abfe67  Fix check for disabled partitions
     new 0e6cccb  Change output behavior for non-exist instances
     new fe8596b  Fix http request hanging issue to the SN API
     new d6a9b1a  Catch exception and log error when helix-admin-webapp fails to read data from certain path
     new e30fe4b  Upgrade Apache rat version and add exclusion paths
     new 5db99fb  Remove relay message from controller's message cache immediately if the partition on relay host turned to ERROR state while transits off from top-state.
     new 0616e97  Enable default Jersey server metric reporting
     new f6f50c2  Fix compute IdealState mapping tool
     new ad0c2ed  Always try reading from EphemeralOwner state first while reading the session ID from a live instance node.
     new 7ac3a6a  Fix looping with keySet and modifying keySet same time

The 15 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../helix/webapp/resources/ResourceUtil.java       |  15 +-
 .../webapp/resources/SchedulerTasksResource.java   |   4 +-
 .../helix/webapp/TestResetPartitionState.java      |   2 +-
 helix-core/helix-core-0.8.5-SNAPSHOT.ivy           |   2 +-
 helix-core/pom.xml                                 |   2 +-
 .../helix/common/caches/CurrentStateCache.java     |   2 +-
 .../helix/common/caches/InstanceMessagesCache.java | 376 +++++++++++--------
 .../helix/controller/GenericHelixController.java   |   2 +-
 .../dataproviders/BaseControllerDataProvider.java  |   2 +-
 .../stages/CurrentStateComputationStage.java       |   6 +-
 .../controller/stages/MessageGenerationPhase.java  |   2 +-
 .../controller/stages/MessageSelectionStage.java   |  21 +-
 .../stages/ResourceComputationStage.java           |   2 +-
 .../stages/TopStateHandoffReportStage.java         |   8 +-
 .../manager/zk/DistributedLeaderElection.java      |  19 +-
 .../helix/manager/zk/ParticipantManager.java       |  21 +-
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |   2 +-
 .../apache/helix/manager/zk/ZKHelixManager.java    |   4 +-
 .../helix/manager/zk/zookeeper/ZkClient.java       | 139 ++++---
 .../helix/messaging/DefaultMessagingService.java   |   2 +-
 .../java/org/apache/helix/model/LiveInstance.java  |  30 +-
 .../main/java/org/apache/helix/model/Message.java  |  22 +-
 .../helix/monitoring/StateTransitionContext.java   |   9 +-
 .../helix/monitoring/mbeans/ZkClientMonitor.java   |  33 +-
 .../monitoring/mbeans/ZkClientPathMonitor.java     |  54 ++-
 .../org/apache/helix/spectator/RoutingTable.java   |   2 +-
 .../helix/spectator/RoutingTableProvider.java      |   2 +-
 .../main/java/org/apache/helix/util/HelixUtil.java |   7 +
 .../apache/helix/util/InstanceValidationUtil.java  |  15 +-
 .../java/org/apache/helix/common/ZkTestBase.java   |  63 +++-
 .../controller/stages/TestRebalancePipeline.java   |  31 +-
 .../helix/integration/TestCleanupExternalView.java |   4 +-
 .../TestMessagePartitionStateMismatch.java         |   2 +-
 .../TestNoThrottleDisabledPartitions.java          |   6 +-
 .../helix/integration/TestResetPartitionState.java |   2 +-
 .../integration/TestSyncSessionToController.java   |  33 +-
 .../integration/manager/TestZkHelixAdmin.java      |  17 +-
 .../messaging/TestP2PMessageSemiAuto.java          |   2 +-
 .../messaging/TestP2PNoDuplicatedMessage.java      |   4 +-
 .../paticipant/TestInstanceCurrentState.java       |   2 +-
 .../paticipant/TestStateTransitionTimeout.java     |  20 +-
 .../TestStateTransitionTimeoutWithResource.java    |  23 +-
 .../helix/manager/zk/TestHandleNewSession.java     |  68 ++--
 .../apache/helix/manager/zk/TestRawZkClient.java   |  27 +-
 .../helix/manager/zk/TestZkClusterManager.java     |  10 +-
 .../messaging/p2pMessage/TestP2PMessages.java      | 415 +++++++++++++++++++++
 .../TestP2PMessagesAvoidDuplicatedMessage.java     |  72 +++-
 .../helix/monitoring/TestParticipantMonitor.java   |   8 +-
 .../monitoring/mbeans/TestZkClientMonitor.java     |  16 +-
 .../helix/util/TestIdealStateAssignment.java       |  11 +-
 .../helix/util/TestInstanceValidationUtil.java     |  15 +-
 .../TestIdealStateAssignment.NoIdealState.json     |  44 ++-
 .../helix/rest/client/CustomRestClientImpl.java    |  32 +-
 .../apache/helix/rest/server/HelixRestServer.java  |   3 +
 .../rest/server/json/cluster/ClusterTopology.java  |  10 +-
 .../server/resources/helix/InstancesAccessor.java  |  39 +-
 .../rest/server/service/ClusterServiceImpl.java    |   5 +-
 .../rest/server/service/InstanceServiceImpl.java   |   2 +-
 .../helix/rest/client/TestCustomRestClient.java    |  15 +
 .../helix/rest/server/TestClusterAccessor.java     |   5 +-
 .../rest/server/TestDefaultMonitoringMbeans.java   |  71 ++++
 .../helix/rest/server/TestInstancesAccessor.java   |  16 +-
 .../server/json/cluster/TestClusterTopology.java   |   6 +-
 .../server/json/instance/TestStoppableCheck.java   |   7 +-
 pom.xml                                            |  10 +-
 65 files changed, 1470 insertions(+), 453 deletions(-)
 create mode 100644 helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
 create mode 100644 helix-rest/src/test/java/org/apache/helix/rest/server/TestDefaultMonitoringMbeans.java


[helix] 08/15: Fix http request hanging issue to the SN API

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit fe8596bf29b6c289ae04289289610c5846845d38
Author: Yi Wang <yw...@linkedin.com>
AuthorDate: Mon Jun 3 11:15:54 2019 -0700

    Fix http request hanging issue to the SN API
    
    RB=1684758
    G=helix-reviewers
    A=jxue
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../helix/rest/client/CustomRestClientImpl.java    | 32 ++++++++++++----------
 .../helix/rest/client/TestCustomRestClient.java    | 15 ++++++++++
 .../server/json/instance/TestStoppableCheck.java   |  7 +++--
 3 files changed, 38 insertions(+), 16 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
index 5d75f6a..a52820b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
@@ -19,26 +19,27 @@ package org.apache.helix.rest.client;
  * under the License.
  */
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
-import org.apache.http.NameValuePair;
+import org.apache.http.HttpStatus;
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.HttpClient;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
 import org.apache.http.client.methods.HttpPost;
-import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
 import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+
 class CustomRestClientImpl implements CustomRestClient {
   private static final Logger LOG = LoggerFactory.getLogger(CustomRestClient.class);
 
@@ -103,13 +104,15 @@ class CustomRestClientImpl implements CustomRestClient {
   protected JsonNode getJsonObject(HttpResponse httpResponse) throws IOException {
     HttpEntity httpEntity = httpResponse.getEntity();
     String str = EntityUtils.toString(httpEntity);
+    LOG.info("Converting Response Content {} to JsonNode", str);
     return OBJECT_MAPPER.readTree(str);
   }
 
   private Map<String, Boolean> handleResponse(HttpResponse httpResponse,
       JsonConverter jsonConverter) throws IOException {
     int status = httpResponse.getStatusLine().getStatusCode();
-    if (status == 200) {
+    if (status == HttpStatus.SC_OK) {
+      LOG.info("Expected HttpResponse statusCode: {}", HttpStatus.SC_OK);
       return jsonConverter.convert(getJsonObject(httpResponse));
     } else {
       throw new ClientProtocolException("Unexpected response status: " + status + ", reason: "
@@ -117,15 +120,16 @@ class CustomRestClientImpl implements CustomRestClient {
     }
   }
 
-  private HttpResponse post(String url, Map<String, String> payloads) throws IOException {
-    List<NameValuePair> params = payloads.entrySet().stream()
-        .map(entry -> new BasicNameValuePair(entry.getKey(), entry.getValue()))
-        .collect(Collectors.toList());
+  @VisibleForTesting
+  protected HttpResponse post(String url, Map<String, String> payloads) throws IOException {
     try {
       HttpPost postRequest = new HttpPost(url);
       postRequest.setHeader("Accept", ACCEPT_CONTENT_TYPE);
-      postRequest.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
-      LOG.info("Executing request {}", postRequest.getRequestLine());
+      StringEntity entity = new StringEntity(OBJECT_MAPPER.writeValueAsString(payloads),
+          ContentType.APPLICATION_JSON);
+      postRequest.setEntity(entity);
+      LOG.info("Executing request: {}, headers: {}, entity: {}", postRequest.getRequestLine(),
+          postRequest.getAllHeaders(), postRequest.getEntity());
       return _httpClient.execute(postRequest);
     } catch (IOException e) {
       LOG.error("Failed to perform customized health check. Is participant endpoint {} available?",
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
index c43578b..812c941 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
@@ -13,6 +13,7 @@ import org.apache.http.StatusLine;
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.HttpClients;
 import org.junit.Assert;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
@@ -130,6 +131,20 @@ public class TestCustomRestClient {
         ImmutableList.of("db0", "db1"), Collections.emptyMap());
   }
 
+  @Test (description = "Validate if the post request has the correct format")
+  public void testPostRequestFormat() throws IOException {
+    // a popular echo server that echos all the inputs
+    // TODO: add a mock rest server
+    final String echoServer = "http://httpbin.org/post";
+    CustomRestClientImpl customRestClient = new CustomRestClientImpl(HttpClients.createDefault());
+    HttpResponse response = customRestClient.post(echoServer, Collections.emptyMap());
+    JsonNode json = customRestClient.getJsonObject(response);
+
+    Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_OK);
+    Assert.assertEquals(json.get("headers").get("Accept").asText(), "application/json");
+    Assert.assertEquals(json.get("data").asText(), "{}");
+  }
+
   private class MockCustomRestClient extends CustomRestClientImpl {
     private String _jsonResponse = "";
 
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/json/instance/TestStoppableCheck.java b/helix-rest/src/test/java/org/apache/helix/rest/server/json/instance/TestStoppableCheck.java
index 0249b8f..43c219b 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/json/instance/TestStoppableCheck.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/json/instance/TestStoppableCheck.java
@@ -31,7 +31,8 @@ public class TestStoppableCheck {
 
   @Test
   public void whenSerializingStoppableCheck() throws JsonProcessingException {
-    StoppableCheck stoppableCheck = new StoppableCheck(false, ImmutableList.of("check"), StoppableCheck.Category.HELIX_OWN_CHECK);
+    StoppableCheck stoppableCheck = new StoppableCheck(false, ImmutableList.of("check"),
+        StoppableCheck.Category.HELIX_OWN_CHECK);
 
     ObjectMapper mapper = new ObjectMapper();
     String result = mapper.writeValueAsString(stoppableCheck);
@@ -41,7 +42,9 @@ public class TestStoppableCheck {
 
   @Test
   public void testConstructorSortingOrder() {
-    StoppableCheck stoppableCheck = new StoppableCheck(ImmutableMap.of("a", true, "c", false, "b", false), StoppableCheck.Category.HELIX_OWN_CHECK);
+    StoppableCheck stoppableCheck =
+        new StoppableCheck(ImmutableMap.of("a", true, "c", false, "b", false),
+            StoppableCheck.Category.HELIX_OWN_CHECK);
     Assert.assertFalse(stoppableCheck.isStoppable());
     Assert.assertEquals(stoppableCheck.getFailedChecks(), ImmutableList.of("Helix:b", "Helix:c"));
   }


[helix] 02/15: Adding Zk data change callback propagation latency metric.

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 112587033881387b75ea298ba80fce3f9e67d5fe
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Tue May 21 19:01:16 2019 -0700

    Adding Zk data change callback propagation latency metric.
    
    Note that the latency metric only covers data change callback for now.
    To adding child change callback, we need to find a way to avoid the additional ZK access that is required for read children node stats. Added TODO in the corresponding code block.
    
    RB=1674550
    G=helix-reviewers
    A=jxue,ksun
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../helix/manager/zk/zookeeper/ZkClient.java       | 39 ++++++++++++----
 .../helix/monitoring/mbeans/ZkClientMonitor.java   | 33 +++++++++----
 .../monitoring/mbeans/ZkClientPathMonitor.java     | 54 ++++++++++++++++------
 .../apache/helix/manager/zk/TestRawZkClient.java   | 27 +++++++++--
 .../monitoring/mbeans/TestZkClientMonitor.java     | 16 ++++++-
 5 files changed, 129 insertions(+), 40 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 8718303..418140b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -17,6 +17,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -131,9 +132,14 @@ public class ZkClient implements Watcher {
   }
 
   private class ZkPathStatRecord {
+    private final String _path;
     private Stat _stat = null;
     private boolean _checked = false;
 
+    public ZkPathStatRecord(String path) {
+      _path = path;
+    }
+
     public boolean pathExists() {
       return _stat != null;
     }
@@ -142,9 +148,19 @@ public class ZkClient implements Watcher {
       return _checked;
     }
 
-    public void recordPathStat(Stat stat) {
+    /*
+     * Note this method is not thread safe.
+     */
+    public void recordPathStat(Stat stat, OptionalLong notificationTime) {
       _checked = true;
       _stat = stat;
+
+      if (_monitor != null && stat != null && notificationTime.isPresent()) {
+        long updateTime = Math.max(stat.getCtime(), stat.getMtime());
+        if (notificationTime.getAsLong() > updateTime) {
+          _monitor.recordDataPropagationLatency(_path, notificationTime.getAsLong() - updateTime);
+        } // else, the node was updated again after the notification. Propagation latency is unavailable.
+      }
     }
   }
 
@@ -628,6 +644,7 @@ public class ZkClient implements Watcher {
 
   @Override
   public void process(WatchedEvent event) {
+    long notificationTime = System.currentTimeMillis();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Received event: " + event);
     }
@@ -662,7 +679,7 @@ public class ZkClient implements Watcher {
         processStateChanged(event);
       }
       if (dataChanged) {
-        processDataOrChildChange(event);
+        processDataOrChildChange(event, notificationTime);
       }
     } finally {
       if (stateChanged) {
@@ -701,7 +718,7 @@ public class ZkClient implements Watcher {
       fireChildChangedEvents(entry.getKey(), entry.getValue());
     }
     for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet()) {
-      fireDataChangedEvents(entry.getKey(), entry.getValue());
+      fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty());
     }
   }
 
@@ -943,13 +960,14 @@ public class ZkClient implements Watcher {
     }
   }
 
-  private void processDataOrChildChange(WatchedEvent event) {
+  private void processDataOrChildChange(WatchedEvent event, long notificationTime) {
     final String path = event.getPath();
 
     if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated
         || event.getType() == EventType.NodeDeleted) {
       Set<IZkChildListener> childListeners = _childListener.get(path);
       if (childListeners != null && !childListeners.isEmpty()) {
+        // TODO recording child changed event propagation latency as well. Note this change will introduce additional ZK access.
         fireChildChangedEvents(path, childListeners);
       }
     }
@@ -958,14 +976,15 @@ public class ZkClient implements Watcher {
         || event.getType() == EventType.NodeCreated) {
       Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
       if (listeners != null && !listeners.isEmpty()) {
-        fireDataChangedEvents(event.getPath(), listeners);
+        fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime));
       }
     }
   }
 
-  private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners) {
+  private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners,
+      final OptionalLong notificationTime) {
     try {
-      final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord();
+      final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
       // Trigger listener callbacks
       for (final IZkDataListenerEntry listener : listeners) {
         _eventThread.send(new ZkEvent(
@@ -975,7 +994,7 @@ public class ZkClient implements Watcher {
           public void run() throws Exception {
             // Reinstall watch before listener callbacks to check the znode status
             if (!pathStatRecord.pathChecked()) {
-              pathStatRecord.recordPathStat(getStat(path, true));
+              pathStatRecord.recordPathStat(getStat(path, true), notificationTime);
             }
             if (!pathStatRecord.pathExists()) {
               // no znode found at the path, trigger data deleted handler.
@@ -1006,14 +1025,14 @@ public class ZkClient implements Watcher {
 
   private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners) {
     try {
-      final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord();
+      final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
       for (final IZkChildListener listener : childListeners) {
         _eventThread.send(new ZkEvent("Children of " + path + " changed sent to " + listener) {
           @Override
           public void run() throws Exception {
             // Reinstall watch before listener callbacks to check the znode status
             if (!pathStatRecord.pathChecked()) {
-              pathStatRecord.recordPathStat(getStat(path, hasListeners(path)));
+              pathStatRecord.recordPathStat(getStat(path, hasListeners(path)), OptionalLong.empty());
             }
             List<String> children = null;
             if (pathStatRecord.pathExists()) {
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
index 0fe6911..88441be 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
@@ -24,6 +24,7 @@ import javax.management.MBeanAttributeInfo;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -152,17 +153,33 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
     }
   }
 
+  public void recordDataPropagationLatency(String path, long latencyMilliSec) {
+    if (null == path) {
+      return;
+    }
+    Arrays.stream(ZkClientPathMonitor.PredefinedPath.values())
+        .filter(predefinedPath -> predefinedPath.match(path))
+        .forEach(predefinedPath -> {
+      ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath);
+      if (zkClientPathMonitor != null) {
+        zkClientPathMonitor.recordDataPropagationLatency(latencyMilliSec);
+      }
+    });
+  }
+
   private void record(String path, int bytes, long latencyMilliSec, boolean isFailure,
       boolean isRead) {
-    for (ZkClientPathMonitor.PredefinedPath predefinedPath : ZkClientPathMonitor.PredefinedPath
-        .values()) {
-      if (predefinedPath.match(path)) {
-        ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath);
-        if (zkClientPathMonitor != null) {
-          zkClientPathMonitor.record(bytes, latencyMilliSec, isFailure, isRead);
-        }
-      }
+    if (null == path) {
+      return;
     }
+    Arrays.stream(ZkClientPathMonitor.PredefinedPath.values())
+        .filter(predefinedPath -> predefinedPath.match(path))
+        .forEach(predefinedPath -> {
+      ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath);
+      if (zkClientPathMonitor != null) {
+        zkClientPathMonitor.record(bytes, latencyMilliSec, isFailure, isRead);
+      }
+    });
   }
 
   public void record(String path, int dataSize, long startTimeMilliSec, AccessType accessType) {
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
index 5f4778c..6cd0cd5 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
@@ -75,7 +75,11 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
     ReadLatencyGauge,
     WriteLatencyGauge,
     ReadBytesGauge,
-    WriteBytesGauge
+    WriteBytesGauge,
+    /*
+     * The latency between a ZK data change happening in the server side and the client side getting notification.
+     */
+    DataPropagationLatencyGuage
   }
 
   private SimpleDynamicMetric<Long> _readCounter;
@@ -91,6 +95,7 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
   private HistogramDynamicMetric _writeLatencyGauge;
   private HistogramDynamicMetric _readBytesGauge;
   private HistogramDynamicMetric _writeBytesGauge;
+  private HistogramDynamicMetric _dataPropagationLatencyGauge;
 
   @Override
   public String getSensorName() {
@@ -107,23 +112,37 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
         .format("%s.%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), monitorType, monitorKey,
             path.name());
 
-    _writeTotalLatencyCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteTotalLatencyCounter.name(), 0l);
-    _readTotalLatencyCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadTotalLatencyCounter.name(), 0l);
-    _writeFailureCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteFailureCounter.name(), 0l);
-    _readFailureCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadFailureCounter.name(), 0l);
-    _writeBytesCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteBytesCounter.name(), 0l);
-    _readBytesCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadBytesCounter.name(), 0l);
+    _writeTotalLatencyCounter =
+        new SimpleDynamicMetric(PredefinedMetricDomains.WriteTotalLatencyCounter.name(), 0l);
+    _readTotalLatencyCounter =
+        new SimpleDynamicMetric(PredefinedMetricDomains.ReadTotalLatencyCounter.name(), 0l);
+    _writeFailureCounter =
+        new SimpleDynamicMetric(PredefinedMetricDomains.WriteFailureCounter.name(), 0l);
+    _readFailureCounter =
+        new SimpleDynamicMetric(PredefinedMetricDomains.ReadFailureCounter.name(), 0l);
+    _writeBytesCounter =
+        new SimpleDynamicMetric(PredefinedMetricDomains.WriteBytesCounter.name(), 0l);
+    _readBytesCounter =
+        new SimpleDynamicMetric(PredefinedMetricDomains.ReadBytesCounter.name(), 0l);
     _writeCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteCounter.name(), 0l);
     _readCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadCounter.name(), 0l);
 
-    _readLatencyGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadLatencyGauge.name(), new Histogram(
-        new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
-    _writeLatencyGauge = new HistogramDynamicMetric(PredefinedMetricDomains.WriteLatencyGauge.name(), new Histogram(
-        new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
-    _readBytesGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadBytesGauge.name(), new Histogram(
-        new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
-    _writeBytesGauge = new HistogramDynamicMetric(PredefinedMetricDomains.WriteBytesGauge.name(), new Histogram(
-        new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _readLatencyGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadLatencyGauge.name(),
+        new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _writeLatencyGauge =
+        new HistogramDynamicMetric(PredefinedMetricDomains.WriteLatencyGauge.name(), new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _readBytesGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadBytesGauge.name(),
+        new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _writeBytesGauge = new HistogramDynamicMetric(PredefinedMetricDomains.WriteBytesGauge.name(),
+        new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _dataPropagationLatencyGauge =
+        new HistogramDynamicMetric(PredefinedMetricDomains.DataPropagationLatencyGuage.name(),
+            new Histogram(new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS,
+                TimeUnit.MILLISECONDS)));
   }
 
   public ZkClientPathMonitor register() throws JMException {
@@ -140,6 +159,7 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
     attributeList.add(_writeLatencyGauge);
     attributeList.add(_readBytesGauge);
     attributeList.add(_writeBytesGauge);
+    attributeList.add(_dataPropagationLatencyGauge);
 
     ObjectName objectName = new ObjectName(String
         .format("%s,%s=%s", ZkClientMonitor.getObjectName(_type, _key, _instanceName).toString(),
@@ -162,6 +182,10 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
     }
   }
 
+  public void recordDataPropagationLatency(long latency) {
+    _dataPropagationLatencyGauge.updateValue(latency);
+  }
+
   private void increaseFailureCounter(boolean isRead) {
     if (isRead) {
       _readFailureCounter.updateValue(_readFailureCounter.getValue() + 1);
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index edbed63..ea67fbf 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -19,6 +19,8 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -26,8 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkServer;
@@ -40,6 +41,7 @@ import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
 import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
@@ -59,7 +61,6 @@ public class TestRawZkClient extends ZkUnitTestBase {
   @BeforeClass
   public void beforeClass() {
     _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
   }
 
   @AfterClass
@@ -86,7 +87,7 @@ public class TestRawZkClient extends ZkUnitTestBase {
     newStat = _zkClient.getStat(path);
     AssertJUnit.assertEquals(stat, newStat);
 
-    _zkClient.writeData(path, new ZNRecord("Test"));
+    _zkClient.writeData(path, "Test");
     newStat = _zkClient.getStat(path);
     AssertJUnit.assertNotSame(stat, newStat);
   }
@@ -265,10 +266,15 @@ public class TestRawZkClient extends ZkUnitTestBase {
     zkClient.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
       @Override
       public void handleDataChange(String dataPath, Object data) {
+        callbackLock();
       }
 
       @Override
       public void handleDataDeleted(String dataPath) {
+        callbackLock();
+      }
+
+      private void callbackLock() {
         lock.lock();
         try {
           callbackFinish.signal();
@@ -278,13 +284,24 @@ public class TestRawZkClient extends ZkUnitTestBase {
       }
     });
     lock.lock();
-    _zkClient.delete(TEST_PATH);
+    _zkClient.writeData(TEST_PATH, "Test");
     Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
     Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 1);
     Assert.assertEquals((long) beanServer.getAttribute(name, "OutstandingRequestGauge"), 0);
     Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackCounter"), 1);
     Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackHandledCounter"), 1);
     Assert.assertEquals((long) beanServer.getAttribute(name, "PendingCallbackGauge"), 0);
+
+    // Simulate a delayed callback
+    int waitTime = 10;
+    Thread.sleep(waitTime);
+    lock.lock();
+    zkClient.process(new WatchedEvent(Watcher.Event.EventType.NodeDataChanged, null, TEST_PATH));
+    Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
+    Assert.assertTrue(
+        (long) beanServer.getAttribute(rootname, "DataPropagationLatencyGuage.Max") >= waitTime);
+
+    _zkClient.delete(TEST_PATH);
   }
 
   @Test(dependsOnMethods = "testZkClientMonitor")
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
index 988ef60..9c04913 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
@@ -19,10 +19,12 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
-import javax.management.*;
 
-import org.apache.helix.manager.zk.zookeeper.ZkEventThread;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -138,5 +140,15 @@ public class TestZkClientMonitor {
         .assertTrue((long) _beanServer.getAttribute(instancesName, "WriteLatencyGauge.Max") >= 10);
     Assert.assertTrue(
         (long) _beanServer.getAttribute(instancesName, "WriteTotalLatencyCounter") >= 10);
+
+    monitor
+        .recordDataPropagationLatency("TEST/INSTANCES/node_1/CURRENTSTATES/session_1/Resource", 5);
+    Assert
+        .assertEquals((long) _beanServer.getAttribute(rootName, "DataPropagationLatencyGuage.Max"), 5);
+    Assert.assertEquals(
+        (long) _beanServer.getAttribute(currentStateName, "DataPropagationLatencyGuage.Max"), 5);
+    Assert
+        .assertEquals((long) _beanServer.getAttribute(idealStateName, "DataPropagationLatencyGuage.Max"),
+            0);
   }
 }


[helix] 13/15: Fix compute IdealState mapping tool

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit f6f50c2ab72131be6072a74ec558a9147e979687
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Tue Jun 18 15:38:24 2019 -0700

    Fix compute IdealState mapping tool
    
    There is a bug in IdealState mapping tool. It does not filtered out the instances are live but disabled. Add this logic and add extra tests for it.
    
    RB=1706106
    BUG=HELIX-1974
    G=helix-reviewers
    A=hulee
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../main/java/org/apache/helix/util/HelixUtil.java |  7 ++++
 .../helix/util/TestIdealStateAssignment.java       | 11 ++++--
 .../TestIdealStateAssignment.NoIdealState.json     | 44 +++++++++++++++++++++-
 3 files changed, 56 insertions(+), 6 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index dafe08d..ef2e578 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyType;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -175,6 +176,12 @@ public final class HelixUtil {
             .getStateCountMap(liveInstances.size(), Integer.parseInt(idealState.getReplicas())),
         idealState.getMaxPartitionsPerInstance());
 
+    // Remove all disabled instances so that Helix will not consider them live.
+    List<String> disabledInstance =
+        instanceConfigs.stream().filter(enabled -> !enabled.getInstanceEnabled())
+            .map(InstanceConfig::getInstanceName).collect(Collectors.toList());
+    liveInstances.removeAll(disabledInstance);
+
     Map<String, List<String>> preferenceLists = strategy
         .computePartitionAssignment(allNodes, liveInstances,
             new HashMap<String, Map<String, String>>(), cache).getListFields();
diff --git a/helix-core/src/test/java/org/apache/helix/util/TestIdealStateAssignment.java b/helix-core/src/test/java/org/apache/helix/util/TestIdealStateAssignment.java
index 7d94b35..fa738b6 100644
--- a/helix-core/src/test/java/org/apache/helix/util/TestIdealStateAssignment.java
+++ b/helix-core/src/test/java/org/apache/helix/util/TestIdealStateAssignment.java
@@ -33,14 +33,17 @@ public class TestIdealStateAssignment {
   private static final String fileNamePrefix = "TestIdealStateAssignment";
 
   @Test(dataProvider = "IdealStateInput")
-  public void testIdealStateAssignment(String clusterName,
-      List<String> instances, List<String> partitions, String numReplicas, String stateModeDef,
-      String strategyName, Map<String, Map<String, String>> expectedMapping)
+  public void testIdealStateAssignment(String clusterName, List<String> instances,
+      List<String> partitions, String numReplicas, String stateModeDef, String strategyName,
+      Map<String, Map<String, String>> expectedMapping, List<String> disabledInstances)
       throws IllegalAccessException, InstantiationException, ClassNotFoundException {
     ClusterConfig clusterConfig = new ClusterConfig(clusterName);
     List<InstanceConfig> instanceConfigs = new ArrayList<>();
     for (String instance : instances) {
       instanceConfigs.add(new InstanceConfig(instance));
+      if (disabledInstances.contains(instance)) {
+        instanceConfigs.get(instanceConfigs.size() - 1).setInstanceEnabled(false);
+      }
     }
 
     IdealState idealState = new IdealState("TestResource");
@@ -58,7 +61,7 @@ public class TestIdealStateAssignment {
   public Object[][] getIdealStateInput() {
     final String[] inputs =
         { "ClusterName", "Instances", "Partitions", "NumReplica", "StateModelDef", "StrategyName",
-            "ExpectedMapping"
+            "ExpectedMapping", "DisabledInstances"
         };
     return TestInputLoader.loadTestInputs(fileNamePrefix + ".NoIdealState.json", inputs);
   }
diff --git a/helix-core/src/test/resources/TestIdealStateAssignment.NoIdealState.json b/helix-core/src/test/resources/TestIdealStateAssignment.NoIdealState.json
index 908c923..fc00de5 100644
--- a/helix-core/src/test/resources/TestIdealStateAssignment.NoIdealState.json
+++ b/helix-core/src/test/resources/TestIdealStateAssignment.NoIdealState.json
@@ -34,7 +34,8 @@
         "localhost_5" : "MASTER",
         "localhost_4" : "SLAVE"
       }
-    }
+    },
+    "DisabledInstances" : []
   },
   {
     "ClusterName": "TestCluster2",
@@ -79,6 +80,45 @@
         "localhost_1" : "SLAVE",
         "localhost_2" : "SLAVE"
       }
-    }
+    },
+    "DisabledInstances" : []
+  },
+  {
+    "ClusterName": "TestCluster3",
+    "Instances": [
+      "localhost_1",
+      "localhost_2",
+      "localhost_3",
+      "localhost_4",
+      "localhost_5"
+    ],
+    "Partitions": [
+      "0",
+      "1",
+      "2",
+      "3"
+    ],
+    "NumReplica": "2",
+    "StateModelDef": "MasterSlave",
+    "StrategyName": "org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy",
+    "ExpectedMapping" : {
+      "0" : {
+        "localhost_3" : "MASTER",
+        "localhost_5" : "SLAVE"
+      },
+      "1" : {
+        "localhost_1" : "MASTER",
+        "localhost_5" : "SLAVE"
+      },
+      "2" : {
+        "localhost_3" : "MASTER",
+        "localhost_1" : "SLAVE"
+      },
+      "3" : {
+        "localhost_5" : "MASTER",
+        "localhost_1" : "SLAVE"
+      }
+    },
+    "DisabledInstances" : ["localhost_2", "localhost_4"]
   }
 ]


[helix] 09/15: Catch exception and log error when helix-admin-webapp fails to read data from certain path

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit d6a9b1a69551e11957c01eda2e590cdd6864fd86
Author: Yi Wang <yw...@linkedin.com>
AuthorDate: Wed Apr 24 16:50:30 2019 -0700

    Catch exception and log error when helix-admin-webapp fails to read data from certain path
    
    RB=1644066
    BUG=https://jira01.corp.linkedin.com:8443/browse/EXC-114388
    G=helix-reviewers
    A=jxue
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../org/apache/helix/webapp/resources/ResourceUtil.java   | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
index f47d6db..78a760a 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
@@ -38,8 +39,12 @@ import org.codehaus.jackson.map.SerializationConfig;
 import org.restlet.Context;
 import org.restlet.Request;
 import org.restlet.data.Form;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 public class ResourceUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(ResourceUtil.class);
   private static final String EMPTY_ZNRECORD_STRING =
       objectToJson(ClusterRepresentationUtil.EMPTY_ZNRECORD);
 
@@ -134,8 +139,14 @@ public class ResourceUtil {
   }
 
   public static String readZkAsBytes(ZkClient zkclient, PropertyKey propertyKey) {
-    byte[] bytes = zkclient.readData(propertyKey.getPath());
-    return bytes == null ? EMPTY_ZNRECORD_STRING : new String(bytes);
+    try {
+      byte[] bytes = zkclient.readData(propertyKey.getPath());
+      return new String(bytes);
+    } catch (Exception e) {
+      String errorMessage = "Exception occurred when reading data from path: " + propertyKey.getPath();
+      LOG.error(errorMessage, e);
+      throw new HelixException(errorMessage, e);
+    }
   }
 
   static String extractSimpleFieldFromZNRecord(String recordStr, String key) {


[helix] 05/15: Remove workaround in sending S->M message when there is a same pending relay message.

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 831593c414e56695725da6ad99a0bd2128157be4
Author: Lei Xia <lx...@linkedin.com>
AuthorDate: Fri May 17 10:36:22 2019 -0700

    Remove workaround in sending S->M message when there is a same pending relay message.
    
    RB=1670732
    BUG=HELIX-1871
    G=helix-reviewers
    A=jjwang,jxue
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../controller/stages/MessageSelectionStage.java   | 16 ++---
 .../integration/manager/TestZkHelixAdmin.java      | 17 +++--
 .../paticipant/TestStateTransitionTimeout.java     | 20 ++++--
 .../TestStateTransitionTimeoutWithResource.java    | 23 ++++---
 .../TestP2PMessagesAvoidDuplicatedMessage.java     | 72 ++++++++++++++++++++--
 5 files changed, 121 insertions(+), 27 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 73c8901..9fd66d2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -184,7 +184,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
       }
 
       if (!messagesGroupByStateTransitPriority.containsKey(priority)) {
-        messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
+        messagesGroupByStateTransitPriority.put(priority, new ArrayList<>());
       }
       messagesGroupByStateTransitPriority.get(priority).add(message);
 
@@ -195,25 +195,25 @@ public class MessageSelectionStage extends AbstractBaseStage {
 
     // select messages
     for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
+      NextMessage:
       for (Message message : messageList) {
         String toState = message.getToState();
         String fromState = message.getFromState();
 
         if (toState.equals(stateModelDef.getTopState())) {
           // find if there are any pending relay messages match this message.
-          // if yes, rebuild the message to use the same message id from the original relay message.
+          // if the pending relay message targets the same host, we are fine to continue send the message,
+          // if it targets to different host, we should not send the message now (should send after the relay message gets expired).
           for (Message relayMsg : pendingRelayMessages) {
             if (relayMsg.getToState().equals(toState) && relayMsg.getFromState()
                 .equals(fromState)) {
-              if (relayMsg.getTgtName().equals(message.getTgtName())) {
-                message = new Message(message, relayMsg.getMsgId());
-              } else {
-                // if there are pending relay message that was sent to a different host than the current message
-                // we should not send the toState message now.
+              LOG.info(
+                  "There is pending relay message, pending relay message: {}", relayMsg);
+              if (!relayMsg.getTgtName().equals(message.getTgtName())) {
                 LOG.info(
                     "There is pending relay message to a different host, not send message: {}, pending relay message: {}",
                     message, relayMsg);
-                continue;
+                continue NextMessage;
               }
             }
           }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
index 5141a8d..49d01a8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration.manager;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.TestHelper;
@@ -73,6 +74,8 @@ public class TestZkHelixAdmin extends TaskTestBase {
     idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO);
     _admin.setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, idealState);
 
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
     String workflowName = TestHelper.getTestMethodName();
     Workflow.Builder builder = new Workflow.Builder(workflowName);
     JobConfig.Builder jobBuilder =
@@ -84,8 +87,14 @@ public class TestZkHelixAdmin extends TaskTestBase {
     Thread.sleep(2000L);
     JobContext jobContext =
         _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
-    Assert.assertEquals(jobContext.getPartitionState(0), null);
-    Assert.assertEquals(jobContext.getPartitionState(1), TaskPartitionState.COMPLETED);
-    Assert.assertEquals(jobContext.getPartitionState(2), null);
+    int n = idealState.getNumPartitions();
+    for ( int i = 0; i < n; i++) {
+      String targetPartition = jobContext.getTargetForPartition(i);
+      if (targetPartition.equals("TestDB_0") || targetPartition.equals("TestDB_2")) {
+        Assert.assertEquals(jobContext.getPartitionState(i), null);
+      } else {
+        Assert.assertEquals(jobContext.getPartitionState(i), TaskPartitionState.COMPLETED);
+      }
+    }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
index 74cf9a2..59aa61e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -164,15 +165,26 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
     Assert.assertTrue(result);
     HelixDataAccessor accessor = _participants[0].getHelixDataAccessor();
 
+    TestHelper.verify(() -> verify(accessor, idealState, factories), 5000);
+    Assert.assertTrue(verify(accessor, idealState, factories));
+  }
+
+  private boolean verify(HelixDataAccessor accessor, IdealState idealState,
+      Map<String, SleepStateModelFactory> factoryMap) {
     Builder kb = accessor.keyBuilder();
     ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
     for (String p : idealState.getPartitionSet()) {
       String idealMaster = idealState.getPreferenceList(p).get(0);
-      Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
+      if (!ev.getStateMap(p).get(idealMaster).equals("ERROR")) {
+        return false;
+      }
 
-      TimeOutStateModel model = factories.get(idealMaster).getStateModel(TEST_DB, p);
-      Assert.assertEquals(model._errorCallcount, 1);
-      Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
+      TimeOutStateModel model = factoryMap.get(idealMaster).getStateModel(TEST_DB, p);
+      if (model._errorCallcount != 1 || model._error.getCode() != ErrorCode.TIMEOUT) {
+        return false;
+      }
     }
+
+    return true;
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
index 7cb3e75..c3bcf47 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
@@ -30,6 +30,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.config.RebalanceConfig;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
@@ -168,11 +169,12 @@ public class TestStateTransitionTimeoutWithResource extends ZkStandAloneCMTestBa
             .verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
 
-    verify(TEST_DB);
+    TestHelper.verify(() -> verify(TEST_DB), 5000);
+    Assert.assertTrue(verify(TEST_DB));
   }
 
   @Test
-  public void testStateTransitionTimeoutByClusterLevel() throws InterruptedException {
+  public void testStateTransitionTimeoutByClusterLevel() throws Exception {
     _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB + 1, _PARTITIONS, STATE_MODEL);
     _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME, TEST_DB + 1, false);
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB  + 1, 3);
@@ -191,22 +193,29 @@ public class TestStateTransitionTimeoutWithResource extends ZkStandAloneCMTestBa
         ClusterStateVerifier
             .verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
-    verify(TEST_DB + 1);
+
+    TestHelper.verify(() -> verify(TEST_DB + 1), 5000);
+    Assert.assertTrue(verify(TEST_DB + 1));
   }
 
-  private void verify(String dbName) {
+  private boolean verify(String dbName) {
     IdealState idealState =
         _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName);
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     ExternalView ev = accessor.getProperty(accessor.keyBuilder().externalView(dbName));
     for (String p : idealState.getPartitionSet()) {
       String idealMaster = idealState.getPreferenceList(p).get(0);
-      Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
+      if(!ev.getStateMap(p).get(idealMaster).equals("ERROR")) {
+        return false;
+      }
 
       TimeOutStateModel model = _factories.get(idealMaster).getStateModel(dbName, p);
-      Assert.assertEquals(model._errorCallcount, 1);
-      Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
+      if (model._errorCallcount != 1 || model._error.getCode() != ErrorCode.TIMEOUT) {
+        return false;
+      }
     }
+
+    return true;
   }
 
   private void setParticipants(String dbName) throws InterruptedException {
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index 3980b36..5d8be22 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -150,11 +150,10 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
     Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
 
-    // Scenario 2:
+    // Scenario 2A:
     // Old master (initialMaster) completes the M->S transition,
     // but has not forward p2p message to new master (secondMaster) yet.
     // Validate: Controller should not send S->M message to new master.
-
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
     currentStateOutput.setPendingMessage(_db, _partition, initialMaster, toSlaveMessage);
     currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
@@ -168,11 +167,76 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     Assert.assertEquals(messages.size(), 0);
 
 
+    // Scenario 2B:
+    // Old master (initialMaster) completes the M->S transition,
+    // There is a pending p2p message to new master (secondMaster).
+    // Validate: Controller should send S->M message to new master at same time.
+
+    currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
+    currentStateOutput.getPendingMessageMap(_db, _partition).clear();
+    currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
+
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    _messagePipeline.handle(event);
+
+    messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(_db, _partition);
+    Assert.assertEquals(messages.size(), 2);
+
+    boolean hasToOffline = false;
+    boolean hasToMaster = false;
+    for (Message msg : messages) {
+      if (msg.getToState().equals(MasterSlaveSMD.States.MASTER.name()) && msg.getTgtName()
+          .equals(secondMaster)) {
+        hasToMaster = true;
+      }
+      if (msg.getToState().equals(MasterSlaveSMD.States.OFFLINE.name()) && msg.getTgtName()
+          .equals(initialMaster)) {
+        hasToOffline = true;
+      }
+    }
+    Assert.assertTrue(hasToMaster);
+    Assert.assertTrue(hasToOffline);
+
+    // Secenario 2C
+    // Old master (initialMaster) completes the M->S transition,
+    // There is a pending p2p message to new master (secondMaster).
+    // However, the new master has been changed in bestPossible
+    // Validate: Controller should not send S->M message to the third master at same time.
+
+    String thirdMaster =
+        getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition),
+            MasterSlaveSMD.States.SLAVE.name());
+
+    Map<String, String> instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
+    instanceStateMap.put(secondMaster, "SLAVE");
+    instanceStateMap.put(thirdMaster, "MASTER");
+    _bestpossibleState.setState(_db, _partition, instanceStateMap);
+
+
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
+
+    _messagePipeline.handle(event);
+
+    messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(_db, _partition);
+    Assert.assertEquals(messages.size(), 1);
+    Assert.assertTrue(messages.get(0).getToState().equals("OFFLINE"));
+    Assert.assertTrue(messages.get(0).getTgtName().equals(initialMaster));
+
+
     // Scenario 3:
     // Old master (initialMaster) completes the M->S transition,
     // and has already forwarded p2p message to new master (secondMaster)
     // The original S->M message sent to old master has been removed.
     // Validate: Controller should send S->O to old master, but not S->M message to new master.
+    instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
+    instanceStateMap.put(secondMaster, "MASTER");
+    instanceStateMap.put(thirdMaster, "SLAVE");
+    _bestpossibleState.setState(_db, _partition, instanceStateMap);
+
     currentStateOutput =
         populateCurrentStateFromBestPossible(_bestpossibleState);
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
@@ -199,11 +263,11 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE");
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
-    String thirdMaster =
+    thirdMaster =
         getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition),
             MasterSlaveSMD.States.SLAVE.name());
 
-    Map<String, String> instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
+    instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition);
     instanceStateMap.put(secondMaster, "SLAVE");
     instanceStateMap.put(thirdMaster, "MASTER");
     _bestpossibleState.setState(_db, _partition, instanceStateMap);


[helix] 04/15: Change state transition monitor to per cluster per state transition

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 15438960debf58d3e5d46dca2aa4de5027ebb86c
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Thu May 23 12:36:59 2019 -0700

    Change state transition monitor to per cluster per state transition
    
    Existing state transition metrics recording per cluster, per resource and per state transition metrics. For the large cluster containing lots of resources may bring tremendous number of metrics at participant side.
    
    This RB changes the metrics to per cluster per state transition, which could be fair enough for monitoring purpose.
    
    RB=1677106
    
    RB=1677106
    BUG=HELIX-1890
    G=helix-reviewers
    A=jjwang
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../java/org/apache/helix/monitoring/StateTransitionContext.java | 9 +++------
 .../java/org/apache/helix/monitoring/TestParticipantMonitor.java | 8 +++++---
 2 files changed, 8 insertions(+), 9 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionContext.java b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionContext.java
index eea15c0..8be4e78 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionContext.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionContext.java
@@ -25,6 +25,8 @@ public class StateTransitionContext {
   private final String _instanceName;
   private final String _transition;
 
+  // To keep it backward compatible, we still have the constructor with instance name and resource
+  // name as input. But it will not be used in generating metrics object name.
   public StateTransitionContext(String clusterName, String instanceName, String resourceName,
       String transition) {
     _clusterName = clusterName;
@@ -57,9 +59,6 @@ public class StateTransitionContext {
 
     StateTransitionContext otherCxt = (StateTransitionContext) other;
     return _clusterName.equals(otherCxt.getClusterName())
-        &&
-        // _instanceName.equals(otherCxt.getInstanceName()) &&
-        _resourceName.equals(otherCxt.getResourceName())
         && _transition.equals(otherCxt.getTransition());
   }
 
@@ -70,9 +69,7 @@ public class StateTransitionContext {
   }
 
   public String toString() {
-    return "Cluster=" + _clusterName + "," +
-    // "instance=" + _instanceName + "," +
-        "Resource=" + _resourceName + "," + "Transition=" + _transition;
+    return "Cluster=" + _clusterName + "," + "Transition=" + _transition;
   }
 
 }
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
index bc9de01..b3de72d 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
@@ -142,9 +142,10 @@ public class TestParticipantMonitor {
     monitor.reportTransitionStat(cxt2, data);
     monitor.reportTransitionStat(cxt2, data);
     Thread.sleep(1000);
-    Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 2);
+    // Only one metric will be generated for db_1 and db_2
+    Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 1);
 
-    Assert.assertFalse(cxt.equals(cxt2));
+    Assert.assertTrue(cxt.equals(cxt2));
     Assert.assertFalse(cxt.equals(new Object()));
     Assert.assertTrue(cxt.equals(new StateTransitionContext(CLUSTER_NAME, "instance", "db_1", "a-b")));
 
@@ -154,7 +155,8 @@ public class TestParticipantMonitor {
         new ParticipantMonitorListener("CLMParticipantReport");
 
     Thread.sleep(1000);
-    Assert.assertEquals(monitorListener2._beanValueMap.size(), monitorNum + 2);
+    // Same here. Helix only measures per cluster + per state transitions.
+    Assert.assertEquals(monitorListener2._beanValueMap.size(), monitorNum + 1);
 
     monitorListener2.disconnect();
     monitorListener.disconnect();


[helix] 12/15: Enable default Jersey server metric reporting

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 0616e972b318c66fdd8f5ce787fc8670aa9459dd
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Wed Jun 12 18:39:50 2019 -0700

    Enable default Jersey server metric reporting
    
    For monitoring Helix REST, we can support both REST server monitoring and customized logic monitoring.
    In this rb, we enable the Jersey server monitoring metrics and adding testing for that.
    
    RB=1701238
    BUG=HELIX-1963
    G=helix-reviewers
    A=ywang4
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../apache/helix/rest/server/HelixRestServer.java  |  3 +
 .../rest/server/TestDefaultMonitoringMbeans.java   | 71 ++++++++++++++++++++++
 2 files changed, 74 insertions(+)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
index eea4501..af49395 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
@@ -43,6 +43,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.server.ServerProperties;
 import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -140,6 +141,8 @@ public class HelixRestServer {
     ResourceConfig cfg = new ResourceConfig();
     cfg.packages(type.getServletPackageArray());
 
+    // Enable the default statistical monitoring MBean for Jersey server
+    cfg.property(ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED, true);
     cfg.property(ContextPropertyKeys.SERVER_CONTEXT.name(),
         new ServerContext(namespace.getMetadataStoreAddress()));
     if (type == ServletType.DEFAULT_SERVLET) {
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestDefaultMonitoringMbeans.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestDefaultMonitoringMbeans.java
new file mode 100644
index 0000000..90bdf17
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestDefaultMonitoringMbeans.java
@@ -0,0 +1,71 @@
+package org.apache.helix.rest.server;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+import java.util.Random;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import javax.ws.rs.core.Response;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDefaultMonitoringMbeans extends AbstractTestClass {
+
+  // For entire testing environment, we could have 2 - 4 rest server during the testing. So we dont
+  // know which REST server got the request and report number. So we have to loop all of them to
+  // report data.
+  @Test
+  public void testDefaultMonitoringMbeans()
+      throws MBeanException, ReflectionException, InstanceNotFoundException, InterruptedException {
+    int listClusters = new Random().nextInt(10);
+    for (int i = 0; i < listClusters; i++) {
+      get("clusters", null, Response.Status.OK.getStatusCode(), true);
+    }
+
+    MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
+    boolean correctReports = false;
+
+    // It may take couple milisecond to propagate the data to MBeanServer
+    while (!correctReports) {
+      for (ObjectName objectName : beanServer.queryNames(null, null)) {
+        if (objectName.toString().contains("getClusters")) {
+          // The object name is complicated, so we get the matched one and try to find out whether
+          // they have the expected attributes and value matched our expectation.
+          try {
+            if (beanServer.getAttribute(objectName, "RequestCount_total")
+                .equals(Long.valueOf(listClusters))) {
+              correctReports = true;
+            }
+          } catch (AttributeNotFoundException e) {
+
+          }
+        }
+      }
+      Thread.sleep(50);
+    }
+
+    Assert.assertTrue(correctReports);
+  }
+}


[helix] 07/15: Change output behavior for non-exist instances

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 0e6cccbaf2e7a0c0fd4c1e3da6d5df349f3dedf6
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Thu May 30 16:47:16 2019 -0700

    Change output behavior for non-exist instances
    
    Current behavior of non-existing instance will be not showing in output. So user is hard to differentiate whether instance does not exist or not belongs to same zone.
    
    Add the logic to check instances exists in instance list or not.
    
    RB=1684700
    BUG=HELIX-1911
    G=helix-reviewers
    A=hulee
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../rest/server/json/cluster/ClusterTopology.java  | 10 +++++-
 .../server/resources/helix/InstancesAccessor.java  | 39 ++++++++++++++++------
 .../rest/server/service/ClusterServiceImpl.java    |  5 ++-
 .../helix/rest/server/TestClusterAccessor.java     |  5 ++-
 .../helix/rest/server/TestInstancesAccessor.java   |  7 ++--
 .../server/json/cluster/TestClusterTopology.java   |  6 ++--
 6 files changed, 54 insertions(+), 18 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/ClusterTopology.java b/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/ClusterTopology.java
index 8325822..701e4bd 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/ClusterTopology.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/ClusterTopology.java
@@ -23,10 +23,14 @@ public class ClusterTopology {
   private final String clusterId;
   @JsonProperty("zones")
   private List<Zone> zones;
+  @JsonProperty("allInstances")
+  private Set<String> allInstances;
 
-  public ClusterTopology(String clusterId, List<Zone> zones) {
+
+  public ClusterTopology(String clusterId, List<Zone> zones, Set<String> allInstances) {
     this.clusterId = clusterId;
     this.zones = zones;
+    this.allInstances = allInstances;
   }
 
   public String getClusterId() {
@@ -37,6 +41,10 @@ public class ClusterTopology {
     return zones;
   }
 
+  public Set<String> getAllInstances() {
+    return allInstances;
+  }
+
   public static final class Zone {
     @JsonProperty("id")
     private final String id;
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index 99449be..36e7249 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -24,6 +24,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.rest.common.HelixDataAccessorWrapper;
+import org.apache.helix.rest.server.json.cluster.ClusterTopology;
 import org.apache.helix.rest.server.json.instance.StoppableCheck;
 import org.apache.helix.rest.server.resources.exceptions.HelixHealthException;
 import org.apache.helix.rest.server.service.ClusterService;
@@ -40,7 +41,10 @@ import org.slf4j.LoggerFactory;
 @Path("/clusters/{clusterId}/instances")
 public class InstancesAccessor extends AbstractHelixResource {
   private final static Logger _logger = LoggerFactory.getLogger(InstancesAccessor.class);
-
+  // This type does not belongs to real HealthCheck failed reason. Also if we add this type
+  // to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl
+  // loops all the types to do corresponding checks.
+  private final static String INSTANCE_NOT_EXIST = "Helix:INSTANCE_NOT_EXIST";
   public enum InstancesProperties {
     instances,
     online,
@@ -57,7 +61,6 @@ public class InstancesAccessor extends AbstractHelixResource {
     zone_based
   }
 
-
   @GET
   public Response getAllInstances(@PathParam("clusterId") String clusterId) {
     HelixDataAccessor accessor = getDataAccssor(clusterId);
@@ -174,9 +177,13 @@ public class InstancesAccessor extends AbstractHelixResource {
           InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
       InstanceService instanceService =
           new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) getDataAccssor(clusterId)), getConfigAccessor());
+      ClusterService clusterService =
+          new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
+      ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
       switch (selectionBase) {
       case zone_based:
-        List<String> zoneBasedInstance = getZoneBasedInstances(clusterId, instances, orderOfZone);
+        List<String> zoneBasedInstance =
+            getZoneBasedInstances(instances, orderOfZone, clusterTopology.toZoneMapping());
         for (String instance : zoneBasedInstance) {
           StoppableCheck stoppableCheckResult =
               instanceService.getInstanceStoppableCheck(clusterId, instance, customizedInput);
@@ -189,6 +196,22 @@ public class InstancesAccessor extends AbstractHelixResource {
             stoppableInstances.add(instance);
           }
         }
+
+        // Adding following logic to check whether instances exist or not. An instance exist could be
+        // checking following scenario:
+        // 1. Instance got dropped. (InstanceConfig is gone.)
+        // 2. Instance name has typo.
+
+        // If we dont add this check, the instance, which does not exist, will be disappeared from
+        // result since Helix skips instances for instances not in the selected zone. User may get
+        // confused with the output.
+        Set<String> nonSelectedInstances = new HashSet<>(instances);
+        nonSelectedInstances.removeAll(clusterTopology.getAllInstances());
+        for (String nonSelectedInstance : nonSelectedInstances) {
+          ArrayNode failedReasonsNode = failedStoppableInstances.putArray(nonSelectedInstance);
+          failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST));
+        }
+
         break;
       case instance_based:
       default:
@@ -214,17 +237,13 @@ public class InstancesAccessor extends AbstractHelixResource {
    * The order of zones can directly come from user input. If user did not specify it, Helix will order
    * zones with alphabetical order.
    *
-   * @param clusterId
    * @param instances
    * @param orderedZones
    * @return
    */
-  private List<String> getZoneBasedInstances(String clusterId, List<String> instances,
-      List<String> orderedZones) {
-    ClusterService clusterService =
-        new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
-    Map<String, Set<String>> zoneMapping =
-        clusterService.getClusterTopology(clusterId).toZoneMapping();
+  private List<String> getZoneBasedInstances(List<String> instances, List<String> orderedZones,
+      Map<String, Set<String>> zoneMapping) {
+
     if (orderedZones == null) {
       orderedZones = new ArrayList<>(zoneMapping.keySet());
     }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java
index 5fb9c61..9f20a02 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java
@@ -5,6 +5,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import java.util.stream.Collectors;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
@@ -56,7 +57,9 @@ public class ClusterServiceImpl implements ClusterService {
       zones.add(zone);
     }
 
-    return new ClusterTopology(cluster, zones);
+    // Get all the instances names
+    return new ClusterTopology(cluster, zones,
+        instanceConfigs.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toSet()));
   }
 
   @Override
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index d07c55f..ab43ae0 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -113,7 +113,10 @@ public class TestClusterAccessor extends AbstractTestClass {
     String response = new JerseyUriRequestBuilder("clusters/{}/topology").format(cluster).get(this);
 
     Assert.assertEquals(response,
-        "{\"id\":\"TestCluster_1\",\"zones\":[{\"id\":\"123\",\"instances\":[{\"id\":\"TestCluster_1localhost_12920\"}]}]}");
+        "{\"id\":\"TestCluster_1\",\"zones\":[{\"id\":\"123\",\"instances\":[{\"id\":\"TestCluster_1localhost_12920\"}]}],"
+            + "\"allInstances\":[\"TestCluster_1localhost_12918\",\"TestCluster_1localhost_12919\",\"TestCluster_1localhost_12924\","
+            + "\"TestCluster_1localhost_12925\",\"TestCluster_1localhost_12926\",\"TestCluster_1localhost_12927\",\"TestCluster_1localhost_12920\","
+            + "\"TestCluster_1localhost_12921\",\"TestCluster_1localhost_12922\",\"TestCluster_1localhost_12923\"]}");
   }
 
   @Test(dependsOnMethods = "testGetClusterTopology")
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 0e86584..78bce96 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -30,11 +30,11 @@ public class TestInstancesAccessor extends AbstractTestClass {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
     // Select instances with zone based
     String content =
-        String.format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\"]}",
+        String.format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"]}",
             InstancesAccessor.InstancesProperties.selection_base.name(),
             InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(),
             InstancesAccessor.InstancesProperties.instances.name(), "instance0", "instance1",
-            "instance2", "instance3", "instance4", "instance5");
+            "instance2", "instance3", "instance4", "instance5", "invalidInstance");
     Response response = new JerseyUriRequestBuilder("clusters/{}/instances?command=stoppable")
         .format(STOPPABLE_CLUSTER)
         .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
@@ -45,7 +45,8 @@ public class TestInstancesAccessor extends AbstractTestClass {
         + "    \"instance1\" : [ \"Helix:EMPTY_RESOURCE_ASSIGNMENT\", \"Helix:INSTANCE_NOT_ENABLED\", \"Helix:INSTANCE_NOT_STABLE\" ],\n"
         + "    \"instance2\" : [ \"Helix:MIN_ACTIVE_REPLICA_CHECK_FAILED\" ],\n"
         + "    \"instance3\" : [ \"Helix:HAS_DISABLED_PARTITION\", \"Helix:MIN_ACTIVE_REPLICA_CHECK_FAILED\" ],\n"
-        + "    \"instance4\" : [ \"Helix:EMPTY_RESOURCE_ASSIGNMENT\", \"Helix:INSTANCE_NOT_ALIVE\", \"Helix:INSTANCE_NOT_STABLE\" ]\n"
+        + "    \"instance4\" : [ \"Helix:EMPTY_RESOURCE_ASSIGNMENT\", \"Helix:INSTANCE_NOT_ALIVE\", \"Helix:INSTANCE_NOT_STABLE\" ],\n"
+        + "    \"invalidInstance\" : [ \"Helix:INSTANCE_NOT_EXIST\" ]\n"
         + "  }\n" + "}\n");
   }
 
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/json/cluster/TestClusterTopology.java b/helix-rest/src/test/java/org/apache/helix/rest/server/json/cluster/TestClusterTopology.java
index eadaa58..cbb0081 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/json/cluster/TestClusterTopology.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/json/cluster/TestClusterTopology.java
@@ -1,6 +1,7 @@
 package org.apache.helix.rest.server.json.cluster;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 
 import org.testng.Assert;
@@ -18,11 +19,12 @@ public class TestClusterTopology {
 
     List<ClusterTopology.Zone> zones = ImmutableList.of(new ClusterTopology.Zone("zone", instances));
 
-    ClusterTopology clusterTopology = new ClusterTopology("cluster0", zones);
+    ClusterTopology clusterTopology =
+        new ClusterTopology("cluster0", zones, Collections.emptySet());
     ObjectMapper mapper = new ObjectMapper();
     String result = mapper.writeValueAsString(clusterTopology);
 
     Assert.assertEquals(result,
-        "{\"id\":\"cluster0\",\"zones\":[{\"id\":\"zone\",\"instances\":[{\"id\":\"instance\"}]}]}");
+        "{\"id\":\"cluster0\",\"zones\":[{\"id\":\"zone\",\"instances\":[{\"id\":\"instance\"}]}],\"allInstances\":[]}");
   }
 }


[helix] 10/15: Upgrade Apache rat version and add exclusion paths

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit e30fe4bf313f230c50ba424a372c9c9dcf1a4735
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Mon Jun 10 15:29:58 2019 -0700

    Upgrade Apache rat version and add exclusion paths
    
    A part of Helix release process requires the rat plugin to perform checks. However, there are scripts and website files that do not require this kind of checks. This diff adds exclusion paths to the pom.xml. Note that there are several Java files that do not still pass the checks due to them not having the Apache license. This still needs to be fixed in the future.
    
    RB=1695987
    G=helix-reviewers
    A=jxue
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 pom.xml | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 71fd300..82c4f34 100644
--- a/pom.xml
+++ b/pom.xml
@@ -601,7 +601,15 @@ under the License.
         <plugin>
           <groupId>org.apache.rat</groupId>
           <artifactId>apache-rat-plugin</artifactId>
-          <version>0.8</version>
+          <version>0.13</version>
+          <configuration>
+            <!-- Exclude files/folders for Apache release -->
+            <excludes>
+              <exclude>website/**</exclude>
+              <exclude>.reviewboardrc</exclude>
+              <exclude>scripts/**</exclude>
+            </excludes>
+          </configuration>
         </plugin>
       </plugins>
     </pluginManagement>


[helix] 11/15: Remove relay message from controller's message cache immediately if the partition on relay host turned to ERROR state while transits off from top-state.

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 5db99fb6df198a57315b2f371186f6e9c64c1dda
Author: Lei Xia <lx...@linkedin.com>
AuthorDate: Tue Jun 4 17:15:17 2019 -0700

    Remove relay message from controller's message cache immediately if the partition on relay host turned to ERROR state while transits off from top-state.
    
    RB=1689771
    BUG=HELIX-1900
    G=helix-reviewers
    A=hulee
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../helix/common/caches/InstanceMessagesCache.java | 372 ++++++++++--------
 .../controller/stages/MessageSelectionStage.java   |   7 +-
 .../main/java/org/apache/helix/model/Message.java  |  22 +-
 .../messaging/p2pMessage/TestP2PMessages.java      | 415 +++++++++++++++++++++
 4 files changed, 651 insertions(+), 165 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index d936467..fb21141 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -29,7 +29,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.model.CurrentState;
@@ -61,15 +63,15 @@ public class InstanceMessagesCache {
   public static final String RELAY_MESSAGE_LIFETIME = "helix.controller.messagecache.relaymessagelifetime";
 
   // If Helix missed all of other events to evict a relay message from the cache, it will delete the message anyway after this timeout.
-  private static final int DEFAULT_RELAY_MESSAGE_LIFETIME = 120 * 1000;  // in ms
-  private final int _relayMessageLifetime;
+  private static final long DEFAULT_RELAY_MESSAGE_LIFETIME = TimeUnit.MINUTES.toMillis(60);  // in ms
+  private final long _relayMessageLifetime;
 
   private String _clusterName;
 
   public InstanceMessagesCache(String clusterName) {
     _clusterName = clusterName;
     _relayMessageLifetime = HelixUtil
-        .getSystemPropertyAsInt(RELAY_MESSAGE_LIFETIME, DEFAULT_RELAY_MESSAGE_LIFETIME);
+        .getSystemPropertyAsLong(RELAY_MESSAGE_LIFETIME, DEFAULT_RELAY_MESSAGE_LIFETIME);
   }
 
   /**
@@ -165,91 +167,62 @@ public class InstanceMessagesCache {
       }
     }
 
-    Map<String, Map<String, Message>> relayMessageMap = new HashMap<>();
     long nextRebalanceTime = Long.MAX_VALUE;
+    long currentTime = System.currentTimeMillis();
+    Map<String, Map<String, Message>> relayMessageMap = new HashMap<>();
 
     // Iterate all relay message in the cache, remove invalid or expired ones.
-    for (String instance : _relayMessageCache.keySet()) {
-      Map<String, Message> relayMessages = _relayMessageCache.get(instance);
+    for (String targetInstance : _relayMessageCache.keySet()) {
+      Map<String, Message> relayMessages = _relayMessageCache.get(targetInstance);
       Iterator<Map.Entry<String, Message>> iterator = relayMessages.entrySet().iterator();
 
       while (iterator.hasNext()) {
         Message relayMessage = iterator.next().getValue();
+        Map<String, Message> instanceMsgMap = _messageMap.get(targetInstance);
+
+        if (!relayMessage.isValid()) {
+          LOG.warn("Invalid relay message {}, remove it from the cache.", relayMessage.getId());
+          iterator.remove();
+          _relayHostMessageCache.remove(relayMessage.getMsgId());
+          continue;
+        }
 
-        Map<String, Message> instanceMsgMap = _messageMap.get(instance);
+        // Check whether the relay message has already been sent to the target host.
         if (instanceMsgMap != null && instanceMsgMap.containsKey(relayMessage.getMsgId())) {
           Message committedMessage = instanceMsgMap.get(relayMessage.getMsgId());
           if (committedMessage.isRelayMessage()) {
-            LOG.info("Relay message committed, remove relay message {} from the cache.", relayMessage
-                .getId());
+            LOG.info("Relay message already committed, remove relay message {} from the cache.",
+                relayMessage.getId());
             iterator.remove();
             _relayHostMessageCache.remove(relayMessage.getMsgId());
             continue;
           } else {
             // controller already sent the same message to target host,
-            // To avoid potential race-condition, do not remove relay message immediately,
-            // just set the relay time as current time .
-            if (relayMessage.getRelayTime() < 0) {
-              relayMessage.setRelayTime(System.currentTimeMillis());
-              LOG.info(
-                  "Controller already sent the message {} to the target host, set message to be relayed at {}",
-                  relayMessage.getId(), relayMessage.getRelayTime());
-            }
-          }
-        }
-
-        String sessionId = relayMessage.getTgtSessionId();
-        String instanceSessionId = liveInstanceMap.get(instance).getSessionId();
-
-        // Target host's session has been changed, remove relay message
-        if (!instanceSessionId.equals(sessionId)) {
-          LOG.info("Instance SessionId does not match, remove relay message {} from the cache.", relayMessage.getId());
-          iterator.remove();
-          _relayHostMessageCache.remove(relayMessage.getMsgId());
-          continue;
-        }
-
-        Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
-            currentStateMap.get(instance);
-        if (instanceCurrentStateMap == null) {
-          LOG.warn("CurrentStateMap null for " + instance);
-          continue;
-        }
-        Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId);
-        if (sessionCurrentStateMap == null) {
-          LOG.warn("CurrentStateMap null for {}, session {}.", instance, sessionId);
-          continue;
-        }
-
-        String resourceName = relayMessage.getResourceName();
-        String partitionName = relayMessage.getPartitionName();
-        String targetState = relayMessage.getToState();
-        String fromState = relayMessage.getFromState();
-        CurrentState currentState = sessionCurrentStateMap.get(resourceName);
-
-        long currentTime = System.currentTimeMillis();
-        if (currentState == null) {
-          if (relayMessage.getRelayTime() < 0) {
-            relayMessage.setRelayTime(currentTime);
-            LOG.warn("CurrentState is null for {} on {}, set relay time {} for message {}",
-                resourceName, instance, relayMessage.getRelayTime(), relayMessage.getId());
+            // Relay host may still forward the p2p message later, so we can not remove the relay message immediately now,
+            // just set the relay time as current time.
+            // TODO: we should remove the message immediately here once we have transaction id support in CurrentState.
+            LOG.info(
+                "Controller already sent the message to the target host, set relay message {} to be expired.",
+                relayMessage.getId());
+            setMessageRelayTime(relayMessage, currentTime);
           }
         }
 
-        // if partition state on the target host already changed, set it to be expired.
-        String partitionState = currentState.getState(partitionName);
-        if (targetState.equals(partitionState) || !fromState.equals(partitionState)) {
-          if (relayMessage.getRelayTime() < 0) {
-            relayMessage.setRelayTime(currentTime);
-            LOG.debug(
-                "CurrentState {} on target host has changed, set relay time {} for message {}.",
-                partitionState, relayMessage.getRelayTime(), relayMessage.getId());
-          }
+        try {
+          // Check partition's state on the relay message's target host (The relay message's destination host).
+          // Set the relay message to be expired immediately or to be expired a certain time in future if necessary.
+          checkTargetHost(targetInstance, relayMessage, liveInstanceMap, currentStateMap);
+
+          // Check partition's state on the original relay host (host that should forward the relay message)
+          // Set the relay message to be expired immediately or to be expired a certain time in future if necessary.
+          Message hostedMessage = _relayHostMessageCache.get(relayMessage.getMsgId());
+          checkRelayHost(relayMessage, liveInstanceMap, currentStateMap, hostedMessage);
+        } catch (Exception e) {
+          LOG.warn(
+              "Failed to check target and relay host and set the relay time. Relay message: {} exception: {}",
+              relayMessage.getId(), e);
         }
 
-        // derive relay time from hosted message and relayed host.
-        setRelayTime(relayMessage, liveInstanceMap, currentStateMap);
-
         if (relayMessage.isExpired()) {
           LOG.info("relay message {} expired, remove it from cache. relay time {}.",
               relayMessage.getId(), relayMessage.getRelayTime());
@@ -272,19 +245,23 @@ public class InstanceMessagesCache {
           continue;
         }
 
+        if (!relayMessageMap.containsKey(targetInstance)) {
+          relayMessageMap.put(targetInstance, Maps.<String, Message>newHashMap());
+        }
+        relayMessageMap.get(targetInstance).put(relayMessage.getMsgId(), relayMessage);
+
+        // Compute the next earliest time to trigger a pipeline run.
         long expiryTime = relayMessage.getCreateTimeStamp() + _relayMessageLifetime;
         if (relayMessage.getRelayTime() > 0) {
           expiryTime = relayMessage.getRelayTime() + relayMessage.getExpiryPeriod();
         }
-
         if (expiryTime < nextRebalanceTime) {
           nextRebalanceTime = expiryTime;
         }
+      } // end while (iterator.hasNext())
 
-        if (!relayMessageMap.containsKey(instance)) {
-          relayMessageMap.put(instance, Maps.<String, Message>newHashMap());
-        }
-        relayMessageMap.get(instance).put(relayMessage.getMsgId(), relayMessage);
+      if (relayMessages.isEmpty()) {
+        _relayMessageCache.remove(targetInstance);
       }
     }
 
@@ -293,122 +270,195 @@ public class InstanceMessagesCache {
     }
 
     _relayMessageMap = Collections.unmodifiableMap(relayMessageMap);
+    long relayMessageCount = 0;
 
-    long relayCount = 0;
-    // Add valid relay message to the instance message map.
+    // Add valid relay messages to the instance message map.
     for (String instance : _relayMessageMap.keySet()) {
       Map<String, Message> relayMessages = _relayMessageMap.get(instance);
       if (!_messageMap.containsKey(instance)) {
         _messageMap.put(instance, Maps.<String, Message>newHashMap());
       }
       _messageMap.get(instance).putAll(relayMessages);
-      relayCount += relayMessages.size();
+      relayMessageCount += relayMessages.size();
     }
 
-    if (LOG.isDebugEnabled()) {
-      if (relayCount > 0) {
-        LOG.debug("Relay message cache size " + relayCount);
-      }
-    }
+    LOG.info(
+        "END: updateRelayMessages(), {} of valid relay messages in cache, took {} ms. ",
+        relayMessageCount, (System.currentTimeMillis() - currentTime));
   }
 
-  // Schedule a future rebalance pipeline run.
-  private void scheduleFuturePipeline(long rebalanceTime) {
-    GenericHelixController controller = GenericHelixController.getController(_clusterName);
-    if (controller != null) {
-      controller.scheduleRebalance(rebalanceTime);
-    } else {
-      LOG.warn(
-          "Failed to schedule a future pipeline run for cluster {} at delay {}, helix controller is null.",
-          _clusterName, (rebalanceTime - System.currentTimeMillis()));
+  // Check partition's state on the relay message's target host (The relay message's destination host).
+  // Set the relay message to be expired immediately or to be expired a certain time in future if necessary.
+  private void checkTargetHost(String targetHost, Message relayMessage, Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
+
+    long currentTime = System.currentTimeMillis();
+    String resourceName = relayMessage.getResourceName();
+    String partitionName = relayMessage.getPartitionName();
+    String sessionId = relayMessage.getTgtSessionId();
+
+    if (!liveInstanceMap.containsKey(targetHost)) {
+      LOG.info("Target host is not alive anymore, expiring relay message {} immediately.",
+          relayMessage.getId());
+      relayMessage.setExpired(true);
+      return;
     }
-  }
 
-  private void setRelayTime(Message relayMessage, Map<String, LiveInstance> liveInstanceMap,
-      Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
-    // relay time already set, avoid to reset it to a later time.
-    if (relayMessage.getRelayTime() > relayMessage.getCreateTimeStamp()) {
+    String instanceSessionId = liveInstanceMap.get(targetHost).getSessionId();
+
+    // Target host's session has been changed, remove relay message
+    if (!instanceSessionId.equals(sessionId)) {
+      LOG.info("Instance SessionId does not match, expiring relay message {} immediately.",
+          relayMessage.getId());
+      relayMessage.setExpired(true);
       return;
     }
 
-    // Relay time has not been set. Proceed to set the relay time
-    try {
-      long currentTime = System.currentTimeMillis();
-      long expiredTime = currentTime + relayMessage.getExpiryPeriod();
-
-      Message hostedMessage = _relayHostMessageCache.get(relayMessage.getMsgId());
-      String sessionId = hostedMessage.getTgtSessionId();
-      String instance = hostedMessage.getTgtName();
-      String resourceName = hostedMessage.getResourceName();
-      if (!liveInstanceMap.containsKey(instance)) {
-        // It's possible that hostedMsg's target is no longer live. In this case, we just set the
-        // relay time and return so that the message could be cleaned up after a short delay
-        relayMessage.setRelayTime(currentTime);
-        return;
-      }
-      String instanceSessionId = liveInstanceMap.get(instance).getSessionId();
-
-      if (!instanceSessionId.equals(sessionId)) {
-        LOG.debug(
-            "Hosted Instance SessionId {} does not match sessionId {} in hosted message , set relay message {} to be expired at {}, hosted message ",
-            instanceSessionId, sessionId, relayMessage.getId(), expiredTime,
-            hostedMessage.getMsgId());
-        relayMessage.setRelayTime(currentTime);
-        return;
-      }
+    Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
+        currentStateMap.get(targetHost);
+    if (instanceCurrentStateMap == null || !instanceCurrentStateMap.containsKey(sessionId)) {
+      // This should happen only when a new session is being established.
+      // We should not do anything here, once new session is established in participant side,
+      // the relay message will be deleted from cache in controller's next pipeline.
+      LOG.warn("CurrentStateMap null for {}, session {}, pending relay message {}", targetHost,
+          sessionId, relayMessage.getId());
+      return;
+    }
 
-      Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
-          currentStateMap.get(instance);
-      if (instanceCurrentStateMap == null) {
-        LOG.debug(
-            "No instanceCurrentStateMap found for {} on {}, set relay messages {} to be expired at {}"
-                + resourceName,
-            instance, relayMessage.getId(), expiredTime);
-        relayMessage.setRelayTime(currentTime);
-        return;
-      }
+    Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId);
+    CurrentState currentState = sessionCurrentStateMap.get(resourceName);
+    // TODO: we should add transaction id for each state transition, we can immediately delete the relay message once we have transaction id record in each currentState.
+    if (currentState == null) {
+      setMessageRelayTime(relayMessage, currentTime);
+      LOG.warn("CurrentState is null for {} on {}, set relay time {} for message {}", resourceName,
+          targetHost, relayMessage.getRelayTime(), relayMessage.getId());
+      return;
+    }
 
-      Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId);
-      if (sessionCurrentStateMap == null) {
-        LOG.debug("No sessionCurrentStateMap found, set relay messages {} to be expired at {}. ",
-            relayMessage.getId(), expiredTime);
-        relayMessage.setRelayTime(currentTime);
-        return;
-      }
+    // if the target partition already completed the state transition,
+    // or the current state on the target partition has been changed,
+    // Do not remove the message immediately to avoid race-condition,
+    // for example, controller may decide to move master to another instance at this time,
+    // if it does not aware of a pending relay message, it may end up with two masters.
+    // so we only set the relay message to be expired.
+    // TODO: we should add transaction id for each state transition, we can immediately delete the relay message once we have transaction id record in each currentState.
+    String partitionCurrentState = currentState.getState(partitionName);
+    String targetState = relayMessage.getToState();
+    String fromState = relayMessage.getFromState();
+    if (targetState.equals(partitionCurrentState) || !fromState.equals(partitionCurrentState)) {
+      setMessageRelayTime(relayMessage, currentTime);
+      LOG.debug("{}'s currentState {} on {} has changed, set relay message {} to be expired.",
+          partitionName, partitionCurrentState, targetHost, relayMessage.getId());
+    }
+  }
+
+  // Check partition's state on the original relay host (host that forwards the relay message)
+  // Set the relay message to be expired immediately or to be expired a certain time in future if necessary.
+  private void checkRelayHost(Message relayMessage, Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Map<String, Map<String, CurrentState>>> currentStateMap, Message hostedMessage) {
+
+    long currentTime = System.currentTimeMillis();
+
+    String sessionId = hostedMessage.getTgtSessionId();
+    String relayInstance = hostedMessage.getTgtName();
+    String resourceName = hostedMessage.getResourceName();
+    String partitionName = hostedMessage.getPartitionName();
+
+    if (!liveInstanceMap.containsKey(relayInstance)) {
+      // If the p2p forwarding host is no longer live, we should not remove the relay message immediately
+      // since we do not know whether the relay message was forwarded before the instance went offline.
+      setMessageRelayTime(relayMessage, currentTime);
+      return;
+    }
+    String instanceSessionId = liveInstanceMap.get(relayInstance).getSessionId();
+    if (!instanceSessionId.equals(sessionId)) {
+      LOG.info("Relay instance sessionId {} does not match sessionId {} in hosted message {}, "
+              + "set relay message {} to be expired.", instanceSessionId, sessionId,
+          relayMessage.getId(), hostedMessage.getMsgId());
+      setMessageRelayTime(relayMessage, currentTime);
+      return;
+    }
+
+    Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
+        currentStateMap.get(relayInstance);
+    if (instanceCurrentStateMap == null || !instanceCurrentStateMap.containsKey(sessionId)) {
+      LOG.warn(
+          "CurrentStateMap null for {}, session {}, set relay messages {} to be expired. Hosted message {}.",
+          relayInstance, sessionId, relayMessage.getId(), hostedMessage.getId());
+      setMessageRelayTime(relayMessage, currentTime);
+      return;
+    }
+
+    Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId);
+    CurrentState currentState = sessionCurrentStateMap.get(resourceName);
 
-      String partitionName = hostedMessage.getPartitionName();
-      String targetState = hostedMessage.getToState();
-      String fromState = hostedMessage.getFromState();
+    if (currentState == null) {
+      LOG.info("No currentState found for {} on {}, set relay message {} to be expired.",
+          resourceName, relayInstance, relayMessage.getId());
+      setMessageRelayTime(relayMessage, currentTime);
+      return;
+    }
 
-      CurrentState currentState = sessionCurrentStateMap.get(resourceName);
-      if (currentState == null) {
-        LOG.debug("No currentState found for {} on {}, set relay message {} to be expired at {} ",
-            resourceName, instance, relayMessage.getId(),
-            (currentTime + relayMessage.getExpiryPeriod()));
-        relayMessage.setRelayTime(currentTime);
+    String partitionState = currentState.getState(partitionName);
+    String targetState = hostedMessage.getToState();
+    String fromState = hostedMessage.getFromState();
+
+    // The relay host partition state has been changed after relay message was created.
+    if (!fromState.equals(partitionState)) {
+      // If the partition on the relay host turned to ERROR while transited from top state,
+      // we can remove the cached relay message right away since participant won't forward the relay message anyway.
+      if (HelixDefinedState.ERROR.name().equals(partitionState) && fromState
+          .equals(currentState.getPreviousState(partitionName))) {
+        LOG.info("Partition {} got to ERROR from the top state, "
+                + "expiring relay message {} immediately. Hosted message {}.", partitionName,
+            relayMessage.getId(), hostedMessage.getId());
+        relayMessage.setExpired(true);
         return;
       }
 
-      if (targetState.equals(currentState.getState(partitionName))) {
+      // If the partition completed the transition, set the relay time to be the actual time when state transition completed.
+      if (targetState.equals(partitionState) && fromState
+          .equals(currentState.getPreviousState(partitionName))) {
+        // The relay host already completed the state transition.
         long completeTime = currentState.getEndTime(partitionName);
-        if (completeTime < relayMessage.getCreateTimeStamp()) {
-          completeTime = currentTime;
+        if (completeTime > relayMessage.getCreateTimeStamp()) {
+          setMessageRelayTime(relayMessage, completeTime);
+          LOG.error("Target state for partition {} matches the hosted message's target state, "
+              + "set relay message {} to be expired.", partitionName, relayMessage.getId());
+          return;
         }
-        relayMessage.setRelayTime(completeTime);
-        LOG.debug(
-            "Target state match the hosted message's target state, set relay message {} relay time at {}.",
-            relayMessage.getId(), completeTime);
       }
 
-      if (!fromState.equals(currentState.getState(partitionName))) {
-        LOG.debug(
-            "Current state does not match hosted message's from state, set relay message {} relay time at {}.",
-            relayMessage.getId(), currentTime);
-        relayMessage.setRelayTime(currentTime);
-      }
-    } catch (Exception e) {
-      LOG.warn("Failed to set the relay time. RelayMsgId: {} Exception: {}", relayMessage.getId(),
-          e);
+      // For all other situations, set relay time to be current time.
+      setMessageRelayTime(relayMessage, currentTime);
+      // the state has been changed after it completed the required state transition (maybe another state-transition happened).
+      LOG.info("Current state {} for partition {} does not match hosted message's from state, "
+              + "set relay message {} to be expired.", partitionState, partitionName,
+          relayMessage.getId());
+    }
+  }
+
+  private void setMessageRelayTime(Message relayMessage, long relayTime) {
+    long currentRelayTime = relayMessage.getRelayTime();
+    // relay time already set, avoid to reset it to a later time.
+    if (currentRelayTime > relayMessage.getCreateTimeStamp() && currentRelayTime < relayTime) {
+      return;
+    }
+    relayMessage.setRelayTime(relayTime);
+    LOG.info("Set relay message {} relay time at {}, to be expired at {}", relayMessage.getId(),
+        relayTime, (relayTime + relayMessage.getExpiryPeriod()));
+  }
+
+
+  // Schedule a future rebalance pipeline run.
+  private void scheduleFuturePipeline(long rebalanceTime) {
+    GenericHelixController controller = GenericHelixController.getController(_clusterName);
+    if (controller != null) {
+      controller.scheduleRebalance(rebalanceTime);
+    } else {
+      LOG.warn(
+          "Failed to schedule a future pipeline run for cluster {} at delay {}, helix controller is null.",
+          _clusterName, (rebalanceTime - System.currentTimeMillis()));
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 9fd66d2..292654a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -208,11 +208,12 @@ public class MessageSelectionStage extends AbstractBaseStage {
             if (relayMsg.getToState().equals(toState) && relayMsg.getFromState()
                 .equals(fromState)) {
               LOG.info(
-                  "There is pending relay message, pending relay message: {}", relayMsg);
+                  "There is pending relay message, pending relay message: {}, relay time starts {}, expiry timeout {}.",
+                  relayMsg.getMsgId(), relayMsg.getRelayTime(), relayMsg.getExpiryPeriod());
               if (!relayMsg.getTgtName().equals(message.getTgtName())) {
                 LOG.info(
-                    "There is pending relay message to a different host, not send message: {}, pending relay message: {}",
-                    message, relayMsg);
+                    "The pending relay message was sent to a different host, not send message: {}, pending relay message: {}",
+                    message.getMsgId(), relayMsg.getId());
                 continue NextMessage;
               }
             }
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 4177b44..607e967 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -112,6 +112,10 @@ public class Message extends HelixProperty {
   // default expiry time period for a relay message.
   public static final long RELAY_MESSAGE_DEFAULT_EXPIRY = 5 * 1000;  //5 second
 
+  // This field is not persisted in zk/znode, i.e, the value will only be changed in local cached copy of the message.
+  // Currently, the field is only used for invalidating messages in controller's message cache.
+  private boolean _expired = false;
+
   /**
    * Compares the creation time of two Messages
    */
@@ -821,13 +825,16 @@ public class Message extends HelixProperty {
    * @return
    */
   public boolean isExpired() {
+    if (_expired) {
+      return true;
+    }
+
     long expiry = getExpiryPeriod();
     if (expiry < 0) {
       return false;
     }
 
     long current = System.currentTimeMillis();
-
     // use relay time if this is a relay message
     if (isRelayMessage()) {
       long relayTime = getRelayTime();
@@ -838,6 +845,19 @@ public class Message extends HelixProperty {
   }
 
   /**
+   * Set a message to expired.
+   *
+   * !! CAUTION: The expired field is not persisted into ZNODE,
+   * i.e, set this field will only change its value in its local cache version,
+   * not the one on ZK, even ZkClient.Set(Message) is called to persist it into ZK.
+   * This method should NOT be called by any non-Helix code.
+   * @param expired
+   */
+  public void setExpired(boolean expired) {
+    _expired = expired;
+  }
+
+  /**
    * Get the expiry period (in milliseconds)
    *
    * @return
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
new file mode 100644
index 0000000..115acd6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
@@ -0,0 +1,415 @@
+package org.apache.helix.messaging.p2pMessage;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.reflect.Method;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BaseStageTest;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
+import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.testng.Assert;
+import org.testng.ITestContext;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestP2PMessages extends BaseStageTest {
+  private String _db = "testDB";
+  private int _numPartition = 1;
+  private int _numReplica = 3;
+
+  private Partition _partition = new Partition(_db + "_0");
+
+  private ResourceControllerDataProvider _dataCache;
+  private Pipeline _fullPipeline;
+
+  private ResourcesStateMap _initialStateMap;
+
+  private Set<String> _instances;
+  private Map<String, LiveInstance> _liveInstanceMap;
+  private String _initialMaster;
+
+  @BeforeClass
+  public void beforeClass() {
+    super.beforeClass();
+    setup();
+    setupIdealState(3, new String[] { _db }, _numPartition, _numReplica,
+        IdealState.RebalanceMode.SEMI_AUTO, BuiltInStateModelDefinitions.MasterSlave.name());
+    setupStateModel();
+    setupInstances(3);
+    setupLiveInstances(3);
+
+    ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
+    clusterConfig.enableP2PMessage(true);
+    setClusterConfig(clusterConfig);
+
+    _dataCache = new ResourceControllerDataProvider(_clusterName);
+    _dataCache.setAsyncTasksThreadPool(Executors.newSingleThreadExecutor());
+
+    _dataCache.refresh(manager.getHelixDataAccessor());
+
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), _dataCache);
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+
+    _fullPipeline = new Pipeline("FullPipeline");
+    _fullPipeline.addStage(new ReadClusterDataStage());
+    _fullPipeline.addStage(new ResourceComputationStage());
+    _fullPipeline.addStage(new CurrentStateComputationStage());
+    _fullPipeline.addStage(new BestPossibleStateCalcStage());
+    _fullPipeline.addStage(new IntermediateStateCalcStage());
+    _fullPipeline.addStage(new ResourceMessageGenerationPhase());
+    _fullPipeline.addStage(new MessageSelectionStage());
+    _fullPipeline.addStage(new MessageThrottleStage());
+    _fullPipeline.addStage(new ResourceMessageDispatchStage());
+
+    try {
+      _fullPipeline.handle(event);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    _instances = _dataCache.getAllInstances();
+    _liveInstanceMap = _dataCache.getLiveInstances();
+
+    _initialStateMap = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+    _initialMaster = getTopStateInstance(_initialStateMap.getInstanceStateMap(_db, _partition),
+        MasterSlaveSMD.States.MASTER.name());
+    Assert.assertNotNull(_initialMaster);
+  }
+
+  @BeforeMethod  // just to overide the per-test setup in base class.
+  public void beforeTest(Method testMethod, ITestContext testContext) {
+    long startTime = System.currentTimeMillis();
+    System.out.println("START " + testMethod.getName() + " at " + new Date(startTime));
+    testContext.setAttribute("StartTime", System.currentTimeMillis());
+  }
+
+  @Test
+  public void testP2PSendAndTimeout() throws Exception {
+    reset(_initialStateMap);
+
+    // Disable old master ((initialMaster) instance,
+    // Validate: a M->S message should be sent to initialMaster with a P2P message attached for secondMaster.
+    admin.enableInstance(_clusterName, _initialMaster, false);
+    _dataCache = event.getAttribute(AttributeName.ControllerDataProvider.name());
+    _dataCache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
+
+    _fullPipeline.handle(event);
+
+    ResourcesStateMap bestpossibleState =
+        event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+
+    List<Message> messages = getMessages(_initialMaster);
+
+    Assert.assertEquals(messages.size(), 1);
+    Message toSlaveMessage = messages.get(0);
+    Assert.assertEquals(toSlaveMessage.getTgtName(), _initialMaster);
+    Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
+    Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
+
+    // verify p2p message are attached to the M->S message sent to the old master instance
+    Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1);
+    String secondMaster =
+        getTopStateInstance(bestpossibleState.getInstanceStateMap(_db, _partition),
+            MasterSlaveSMD.States.MASTER.name());
+
+    Message relayMessage = toSlaveMessage.getRelayMessage(secondMaster);
+    Assert.assertNotNull(relayMessage);
+    Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
+    Assert.assertEquals(relayMessage.getTgtName(), secondMaster);
+    Assert.assertEquals(relayMessage.getRelaySrcHost(), _initialMaster);
+    Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
+
+    // Old master (initialMaster) completed the M->S transition,
+    // but has not forward p2p message to new master (secondMaster) yet.
+    // Validate: Controller should not send S->M message to new master.
+    handleMessage(_initialMaster, _db);
+    _fullPipeline.handle(event);
+    messages = getMessages(secondMaster);
+    Assert.assertEquals(messages.size(), 0);
+
+    // Old master (initialMaster) completed the M->S transition,
+    // but has not forward p2p message to new master (secondMaster) yet, but p2p message should already timeout.
+    // Validate: Controller should send S->M message to new master.
+    Thread.sleep(Message.RELAY_MESSAGE_DEFAULT_EXPIRY);
+
+    _fullPipeline.handle(event);
+    messages = getMessages(secondMaster);
+    Assert.assertEquals(messages.size(), 1);
+    Assert.assertEquals(messages.get(0).getTgtName(), secondMaster);
+    Assert.assertEquals(messages.get(0).getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(messages.get(0).getToState(), MasterSlaveSMD.States.MASTER.name());
+  }
+
+  @Test
+  public void testP2PWithErrorState() throws Exception {
+    reset(_initialStateMap);
+    // Disable old master ((initialMaster) instance,
+    // Validate: a M->S message should be sent to initialMaster with a P2P message attached for secondMaster.
+
+    // disable existing master instance
+    admin.enableInstance(_clusterName, _initialMaster, false);
+    _dataCache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
+    _fullPipeline.handle(event);
+
+    ResourcesStateMap bestpossibleState =
+        event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+
+    List<Message> messages = getMessages(_initialMaster);
+
+    Assert.assertEquals(messages.size(), 1);
+    Message toSlaveMessage = messages.get(0);
+
+    // verify p2p message are attached to the M->S message sent to the old master instance
+    Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1);
+    String secondMaster =
+        getTopStateInstance(bestpossibleState.getInstanceStateMap(_db, _partition),
+            MasterSlaveSMD.States.MASTER.name());
+    Message relayMessage = toSlaveMessage.getRelayMessage(secondMaster);
+    Assert.assertNotNull(relayMessage);
+    Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
+    Assert.assertEquals(relayMessage.getTgtName(), secondMaster);
+
+    // Old master (initialMaster) failed the M->S transition,
+    // but has not forward p2p message to new master (secondMaster) yet.
+    // Validate: Controller should ignore the ERROR partition and send S->M message to new master.
+    String session = _dataCache.getLiveInstances().get(_initialMaster).getSessionId();
+    PropertyKey currentStateKey =
+        new PropertyKey.Builder(_clusterName).currentState(_initialMaster, session, _db);
+    CurrentState currentState = accessor.getProperty(currentStateKey);
+    currentState
+        .setPreviousState(_partition.getPartitionName(), MasterSlaveSMD.States.MASTER.name());
+    currentState.setState(_partition.getPartitionName(), HelixDefinedState.ERROR.name());
+    currentState.setEndTime(_partition.getPartitionName(), System.currentTimeMillis());
+    accessor.setProperty(currentStateKey, currentState);
+
+    PropertyKey messageKey =
+        new PropertyKey.Builder(_clusterName).message(_initialMaster, messages.get(0).getMsgId());
+    accessor.removeProperty(messageKey);
+
+    _fullPipeline.handle(event);
+
+    messages = getMessages(secondMaster);
+
+    Assert.assertEquals(messages.size(), 1);
+    Assert.assertEquals(messages.get(0).getTgtName(), secondMaster);
+    Assert.assertEquals(messages.get(0).getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(messages.get(0).getToState(), MasterSlaveSMD.States.MASTER.name());
+  }
+
+  @Test
+  public void testP2PWithInstanceOffline() throws Exception {
+    reset(_initialStateMap);
+    // Disable old master ((initialMaster) instance,
+    // Validate: a M->S message should be sent to initialMaster with a P2P message attached for secondMaster.
+
+    // disable existing master instance
+    admin.enableInstance(_clusterName, _initialMaster, false);
+    _dataCache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
+    _fullPipeline.handle(event);
+
+    ResourcesStateMap bestpossibleState =
+        event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+
+    List<Message> messages = getMessages(_initialMaster);
+
+    Assert.assertEquals(messages.size(), 1);
+    Message toSlaveMessage = messages.get(0);
+    ;
+
+    // verify p2p message are attached to the M->S message sent to the old master instance
+    Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1);
+    String secondMaster =
+        getTopStateInstance(bestpossibleState.getInstanceStateMap(_db, _partition),
+            MasterSlaveSMD.States.MASTER.name());
+    Message relayMessage = toSlaveMessage.getRelayMessage(secondMaster);
+    Assert.assertNotNull(relayMessage);
+    Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
+    Assert.assertEquals(relayMessage.getTgtName(), secondMaster);
+
+    // Old master (initialMaster) completed the M->S transition,
+    // but has not forward p2p message to new master (secondMaster) yet.
+    // Validate: Controller should not send S->M message to new master.
+    handleMessage(_initialMaster, _db);
+    _fullPipeline.handle(event);
+    messages = getMessages(secondMaster);
+    Assert.assertEquals(messages.size(), 0);
+
+    // New master (second master) instance goes offline, controller should send S->M to the third master immediately.
+    PropertyKey liveInstanceKey = new PropertyKey.Builder(_clusterName).liveInstance(secondMaster);
+    accessor.removeProperty(liveInstanceKey);
+    _dataCache.requireFullRefresh();
+
+    _fullPipeline.handle(event);
+
+    bestpossibleState = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+    String thirdMaster = getTopStateInstance(bestpossibleState.getInstanceStateMap(_db, _partition),
+        MasterSlaveSMD.States.MASTER.name());
+
+    Assert.assertTrue(secondMaster != thirdMaster);
+    messages = getMessages(thirdMaster);
+    Assert.assertEquals(messages.size(), 1);
+    Assert.assertEquals(messages.get(0).getTgtName(), thirdMaster);
+    Assert.assertEquals(messages.get(0).getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(messages.get(0).getToState(), MasterSlaveSMD.States.MASTER.name());
+  }
+
+  /**
+   * This is to simulate the participant (without starting a real participant thread) to handle the pending message.
+   * It sets the CurrentState to target State, and remove the pending message from ZK.
+   * @param instance
+   * @param resource
+   */
+  private void handleMessage(String instance, String resource) {
+    PropertyKey propertyKey = new PropertyKey.Builder(_clusterName).messages(instance);
+    List<Message> messages = accessor.getChildValues(propertyKey);
+    String session = _dataCache.getLiveInstances().get(instance).getSessionId();
+
+    for (Message m : messages) {
+      if (m.getResourceName().equals(resource)) {
+        PropertyKey currentStateKey =
+            new PropertyKey.Builder(_clusterName).currentState(instance, session, resource);
+        CurrentState currentState = accessor.getProperty(currentStateKey);
+        if (currentState == null) {
+          currentState = new CurrentState(resource);
+          currentState.setSessionId(session);
+          currentState.setStateModelDefRef(BuiltInStateModelDefinitions.MasterSlave.name());
+        }
+
+        String partition = m.getPartitionName();
+        String fromState = m.getFromState();
+        String toState = m.getToState();
+        String partitionState = currentState.getState(partition);
+
+        if ((partitionState == null && fromState.equals(
+            BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition().getInitialState()))
+            || (partitionState.equals(fromState))) {
+          currentState.setPreviousState(partition, fromState);
+          currentState.setState(partition, toState);
+          currentState.setStartTime(partition, System.currentTimeMillis());
+          try {
+            Thread.sleep(50);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+          currentState.setEndTime(partition, System.currentTimeMillis());
+          accessor.setProperty(currentStateKey, currentState);
+          PropertyKey messageKey =
+              new PropertyKey.Builder(_clusterName).message(instance, m.getMsgId());
+          accessor.removeProperty(messageKey);
+        }
+      }
+    }
+  }
+
+  /**
+   *  Enable all instances, clean all pending messages, set CurrentState to the BestPossibleState
+    */
+  private void reset(ResourcesStateMap bestpossibleState) {
+    for (String ins : _liveInstanceMap.keySet()) {
+      LiveInstance liveInstance = _liveInstanceMap.get(ins);
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      accessor.setProperty(keyBuilder.liveInstance(liveInstance.getId()), liveInstance);
+    }
+    for (String ins : _instances) {
+      admin.enableInstance(_clusterName, _initialMaster, true);
+      cleanMessages(ins);
+    }
+    for (String resource : bestpossibleState.resourceSet()) {
+      setCurrentState(resource, bestpossibleState.getPartitionStateMap(resource).getStateMap());
+    }
+    for (String ins : _instances) {
+      cleanMessages(ins);
+    }
+    _dataCache.requireFullRefresh();
+  }
+
+  private void setCurrentState(String resource,
+      Map<Partition, Map<String, String>> partitionStateMap) {
+    for (Partition p : partitionStateMap.keySet()) {
+      Map<String, String> partitionState = partitionStateMap.get(p);
+      for (String instance : partitionState.keySet()) {
+        String state = partitionState.get(instance);
+        String session = _liveInstanceMap.get(instance).getSessionId();
+        PropertyKey currentStateKey =
+            new PropertyKey.Builder(_clusterName).currentState(instance, session, resource);
+        CurrentState currentState = accessor.getProperty(currentStateKey);
+        if (currentState == null) {
+          currentState = new CurrentState(resource);
+          currentState.setSessionId(session);
+          currentState.setStateModelDefRef(BuiltInStateModelDefinitions.MasterSlave.name());
+        }
+        currentState.setState(p.getPartitionName(), state);
+        accessor.setProperty(currentStateKey, currentState);
+      }
+    }
+  }
+
+  private void cleanMessages(String instance) {
+    PropertyKey propertyKey = new PropertyKey.Builder(_clusterName).messages(instance);
+    List<Message> messages = accessor.getChildValues(propertyKey);
+    for (Message m : messages) {
+      accessor
+          .removeProperty(new PropertyKey.Builder(_clusterName).message(instance, m.getMsgId()));
+    }
+  }
+
+  List<Message> getMessages(String instance) {
+    return accessor.getChildValues(new PropertyKey.Builder(_clusterName).messages(instance));
+  }
+
+  private String getTopStateInstance(Map<String, String> instanceStateMap, String topState) {
+    String masterInstance = null;
+    for (Map.Entry<String, String> e : instanceStateMap.entrySet()) {
+      if (topState.equals(e.getValue())) {
+        masterInstance = e.getKey();
+      }
+    }
+
+    return masterInstance;
+  }
+}


[helix] 03/15: Upgrade ZK to 3.4.13

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 9707b382cf184fbdb6d5573ca79367148785dfbd
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri May 31 17:08:03 2019 -0700

    Upgrade ZK to 3.4.13
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 helix-core/helix-core-0.8.5-SNAPSHOT.ivy | 2 +-
 helix-core/pom.xml                       | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/helix-core/helix-core-0.8.5-SNAPSHOT.ivy b/helix-core/helix-core-0.8.5-SNAPSHOT.ivy
index 3f4f9a6..c61b78f 100644
--- a/helix-core/helix-core-0.8.5-SNAPSHOT.ivy
+++ b/helix-core/helix-core-0.8.5-SNAPSHOT.ivy
@@ -52,7 +52,7 @@ under the License.
     <dependency org="org.slf4j" name="slf4j-log4j12" rev="1.7.14" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
         <artifact name="slf4j-log4j12" ext="jar"/>
     </dependency>
-    <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.11" conf="compile->compile(default);runtime->runtime(default);default->default"/>
+    <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.13" conf="compile->compile(default);runtime->runtime(default);default->default"/>
     <dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="1.8.5" conf="compile->compile(default);runtime->runtime(default);default->default"/>
     <dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="1.8.5" conf="compile->compile(default);runtime->runtime(default);default->default"/>
     <dependency org="commons-io" name="commons-io" rev="1.4" conf="compile->compile(default);runtime->runtime(default);default->default"/>
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index 9b6cf44..d446b10 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -85,7 +85,7 @@ under the License.
     <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
-      <version>3.4.11</version>
+      <version>3.4.13</version>
       <exclusions>
         <exclusion>
           <groupId>junit</groupId>


[helix] 01/15: Introduce ZkPathStatRecord to record watch reinstall in ZkClient.

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit cc14cfd4c17ba9c26047bbd08fb2b1d249546f2d
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Thu May 23 00:52:18 2019 -0700

    Introduce ZkPathStatRecord to record watch reinstall in ZkClient.
    
    This is to avoid duplicate watch re-install in the current implementation.
    In addition, we will also leverage this Event to report ZK data propagation latency. The related change has been split to an other rb.
---
 .../helix/manager/zk/zookeeper/ZkClient.java       | 110 ++++++++++++++-------
 1 file changed, 73 insertions(+), 37 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 03de880..8718303 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -10,6 +10,7 @@
  */
 package org.apache.helix.manager.zk.zookeeper;
 
+import javax.management.JMException;
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.Date;
@@ -21,7 +22,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
-import javax.management.JMException;
+
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.ExceptionUtil;
 import org.I0Itec.zkclient.IZkChildListener;
@@ -129,6 +130,25 @@ public class ZkClient implements Watcher {
     }
   }
 
+  private class ZkPathStatRecord {
+    private Stat _stat = null;
+    private boolean _checked = false;
+
+    public boolean pathExists() {
+      return _stat != null;
+    }
+
+    public boolean pathChecked() {
+      return _checked;
+    }
+
+    public void recordPathStat(Stat stat) {
+      _checked = true;
+      _stat = stat;
+    }
+  }
+
+
   protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
       String monitorInstanceName, boolean monitorRootPathOnly) {
@@ -754,15 +774,14 @@ public class ZkClient implements Watcher {
   }
 
   public Stat getStat(final String path) {
+    return getStat(path, false);
+  }
+
+  private Stat getStat(final String path, final boolean watch) {
     long startT = System.currentTimeMillis();
     try {
-      Stat stat = retryUntilConnected(new Callable<Stat>() {
-        @Override
-        public Stat call() throws Exception {
-          Stat stat = ((ZkConnection) getConnection()).getZookeeper().exists(path, false);
-          return stat;
-        }
-      });
+      Stat stat = retryUntilConnected(
+          () -> ((ZkConnection) getConnection()).getZookeeper().exists(path, watch));
       record(path, null, startT, ZkClientMonitor.AccessType.READ);
       return stat;
     } catch (Exception e) {
@@ -945,55 +964,72 @@ public class ZkClient implements Watcher {
   }
 
   private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners) {
-    for (final IZkDataListenerEntry listener : listeners) {
-      _eventThread.send(new ZkEvent(
-          "Data of " + path + " changed sent to " + listener.getDataListener() + " prefetch data: "
-              + listener.isPrefetchData()) {
-
-        @Override public void run() throws Exception {
-          // reinstall watch
-          boolean exist = exists(path, true);
-          if (exist) {
-            try {
+    try {
+      final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord();
+      // Trigger listener callbacks
+      for (final IZkDataListenerEntry listener : listeners) {
+        _eventThread.send(new ZkEvent(
+            "Data of " + path + " changed sent to " + listener.getDataListener()
+                + " prefetch data: " + listener.isPrefetchData()) {
+          @Override
+          public void run() throws Exception {
+            // Reinstall watch before listener callbacks to check the znode status
+            if (!pathStatRecord.pathChecked()) {
+              pathStatRecord.recordPathStat(getStat(path, true));
+            }
+            if (!pathStatRecord.pathExists()) {
+              // no znode found at the path, trigger data deleted handler.
+              listener.getDataListener().handleDataDeleted(path);
+            } else {
               Object data = null;
               if (listener.isPrefetchData()) {
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Prefetch data for path: " + path);
+                  LOG.debug("Prefetch data for path: {}", path);
+                }
+                try {
+                  data = readData(path, null, true);
+                } catch (ZkNoNodeException e) {
+                  LOG.warn("Prefetch data for path: {} failed.", path, e);
+                  listener.getDataListener().handleDataDeleted(path);
+                  return;
                 }
-                data = readData(path, null, true);
               }
               listener.getDataListener().handleDataChange(path, data);
-            } catch (ZkNoNodeException e) {
-              listener.getDataListener().handleDataDeleted(path);
             }
-          } else {
-            listener.getDataListener().handleDataDeleted(path);
           }
-        }
-      });
+        });
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to fire data changed event for path: {}", path, e);
     }
   }
 
   private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners) {
     try {
-      // reinstall the watch
+      final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord();
       for (final IZkChildListener listener : childListeners) {
         _eventThread.send(new ZkEvent("Children of " + path + " changed sent to " + listener) {
-
-          @Override public void run() throws Exception {
-            try {
-              // if the node doesn't exist we should listen for the root node to reappear
-              exists(path);
-              List<String> children = getChildren(path);
-              listener.handleChildChange(path, children);
-            } catch (ZkNoNodeException e) {
-              listener.handleChildChange(path, null);
+          @Override
+          public void run() throws Exception {
+            // Reinstall watch before listener callbacks to check the znode status
+            if (!pathStatRecord.pathChecked()) {
+              pathStatRecord.recordPathStat(getStat(path, hasListeners(path)));
+            }
+            List<String> children = null;
+            if (pathStatRecord.pathExists()) {
+              try {
+                children = getChildren(path);
+              } catch (ZkNoNodeException e) {
+                LOG.warn("Get children under path: {} failed.", path, e);
+                // Continue trigger the change handler
+              }
             }
+            listener.handleChildChange(path, children);
           }
         });
       }
     } catch (Exception e) {
-      LOG.error("Failed to fire child changed event. Unable to getChildren.  ", e);
+      LOG.error("Failed to fire child changed event. Unable to getChildren.", e);
     }
   }
 


[helix] 06/15: Fix check for disabled partitions

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 8abfe67fa5612cae39f591c2c0bd7a41b48b91e7
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Wed May 29 18:19:43 2019 -0700

    Fix check for disabled partitions
    
    For the map field of disabled partitions, even they are all enabled, there could be some key left over for resources. We cannot just check if there is any resource entries. With this fix, Helix loops all the resource entries of disabled map to see whether there is a parition list is not empty.
    
    In addition, fix failed tests in REST.
    
    RB=1683071
    BUG=HELIX-1910
    G=helix-reviewers
    A=hulee
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../org/apache/helix/util/InstanceValidationUtil.java     | 11 ++++++++++-
 .../org/apache/helix/util/TestInstanceValidationUtil.java | 15 ++++++++++++++-
 .../apache/helix/rest/server/TestInstancesAccessor.java   |  9 ++++++---
 3 files changed, 30 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
index 7239eeb..b373469 100644
--- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
@@ -130,7 +130,16 @@ public class InstanceValidationUtil {
     PropertyKey propertyKey = dataAccessor.keyBuilder().instanceConfig(instanceName);
     InstanceConfig instanceConfig = dataAccessor.getProperty(propertyKey);
     if (instanceConfig != null) {
-      return !instanceConfig.getDisabledPartitionsMap().isEmpty();
+      // Originally, Helix only checks whether disabled partition map has entries or not. But this
+      // could cause problem when some of partitions disabled and enabled back, the resource entries
+      // are still left there. For detailed check, we shall check the whether partition list is empty
+      // or not
+      for (List<String> disabledPartitions : instanceConfig.getDisabledPartitionsMap().values()) {
+        if (disabledPartitions != null && disabledPartitions.size() > 0) {
+          return true;
+        }
+      }
+      return false;
     }
 
     throw new HelixException("Fail to get instance config for " + instanceName);
diff --git a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
index 2b193b4..be5d136 100644
--- a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
@@ -2,6 +2,7 @@ package org.apache.helix.util;
 
 import static org.mockito.Mockito.*;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -158,11 +159,23 @@ public class TestInstanceValidationUtil {
   }
 
   @Test
+  public void TestHasDisabledPartitions_with_only_names() {
+    Mock mock = new Mock();
+    InstanceConfig instanceConfig = mock(InstanceConfig.class);
+    when(instanceConfig.getDisabledPartitionsMap())
+        .thenReturn(ImmutableMap.of("db0", Collections.emptyList()));
+    when(mock.dataAccessor.getProperty(any(PropertyKey.class))).thenReturn(instanceConfig);
+
+    Assert.assertFalse(InstanceValidationUtil.hasDisabledPartitions(mock.dataAccessor, TEST_CLUSTER,
+        TEST_INSTANCE));
+  }
+
+  @Test
   public void TestHasDisabledPartitions_true() {
     Mock mock = new Mock();
     InstanceConfig instanceConfig = mock(InstanceConfig.class);
     when(instanceConfig.getDisabledPartitionsMap())
-        .thenReturn(ImmutableMap.of("db0", Collections.<String> emptyList()));
+        .thenReturn(ImmutableMap.of("db0", Arrays.asList("p1")));
     when(mock.dataAccessor.getProperty(any(PropertyKey.class))).thenReturn(instanceConfig);
 
     Assert.assertTrue(InstanceValidationUtil.hasDisabledPartitions(mock.dataAccessor, TEST_CLUSTER,
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index c3c8691..0e86584 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -50,7 +50,7 @@ public class TestInstancesAccessor extends AbstractTestClass {
   }
 
   @Test(dependsOnMethods = "testInstancesStoppable_zoneBased")
-  public void testInstancesStoppable_disableOneInstance() {
+  public void testInstancesStoppable_disableOneInstance() throws InterruptedException {
     // Disable one selected instance0, it should failed to check
     String instance = "instance0";
     InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER, instance);
@@ -58,6 +58,11 @@ public class TestInstancesAccessor extends AbstractTestClass {
     instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", false);
     _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER, instance, instanceConfig);
 
+    // It takes time to reflect the changes.
+    BestPossibleExternalViewVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(verifier.verifyByPolling());
+
     Entity entity = Entity.entity("", MediaType.APPLICATION_JSON_TYPE);
     Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable")
         .format(STOPPABLE_CLUSTER, instance).post(this, entity);
@@ -69,8 +74,6 @@ public class TestInstancesAccessor extends AbstractTestClass {
     instanceConfig.setInstanceEnabled(true);
     instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", true);
     _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER, instance, instanceConfig);
-    BestPossibleExternalViewVerifier verifier =
-        new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER).setZkAddr(ZK_ADDR).build();
     Assert.assertTrue(verifier.verifyByPolling());
 
     entity = Entity.entity("", MediaType.APPLICATION_JSON_TYPE);


[helix] 15/15: Fix looping with keySet and modifying keySet same time

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 7ac3a6a1762bc74869a8f7ea0a9c7feed8b3ba0a
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Fri Jun 21 11:58:13 2019 -0700

    Fix looping with keySet and modifying keySet same time
    
    Looping with keySet and modifying with keySet entry at same time could cause ConcurrentModificationException. Fix that with adding to extra new Set and remove after looping is done.
    
    RB=1711266
    G=helix-reviewers
    A=jjwang,hulee,ksun
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../java/org/apache/helix/common/caches/InstanceMessagesCache.java  | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index 21fba5b..97ece27 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Sets;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -170,7 +171,7 @@ public class InstanceMessagesCache {
     long nextRebalanceTime = Long.MAX_VALUE;
     long currentTime = System.currentTimeMillis();
     Map<String, Map<String, Message>> relayMessageMap = new HashMap<>();
-
+    Set<String> targetInstanceToRemove = new HashSet<>();
     // Iterate all relay message in the cache, remove invalid or expired ones.
     for (String targetInstance : _relayMessageCache.keySet()) {
       Map<String, Message> relayMessages = _relayMessageCache.get(targetInstance);
@@ -261,9 +262,10 @@ public class InstanceMessagesCache {
       } // end while (iterator.hasNext())
 
       if (relayMessages.isEmpty()) {
-        _relayMessageCache.remove(targetInstance);
+        targetInstanceToRemove.add(targetInstance);
       }
     }
+    _relayMessageCache.keySet().removeAll(targetInstanceToRemove);
 
     if (nextRebalanceTime < Long.MAX_VALUE) {
       scheduleFuturePipeline(nextRebalanceTime);


[helix] 14/15: Always try reading from EphemeralOwner state first while reading the session ID from a live instance node.

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ad0c2edb31a4d0e5c455d1fa0f96658bac1382d5
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Mon Jun 17 14:37:02 2019 -0700

    Always try reading from EphemeralOwner state first while reading the session ID from a live instance node.
    
    This is to avoid inconsistent session ID in the node content and the emphemeral owner state.
    Note that in order to ensure backward compatiblity and some test cases, the newly introduced method will still read from the node content if the ephemeral owner state is empty (-1 or 0).
    
    RB=1704942
    BUG=HELIX-1969
    G=helix-reviewers
    A=jxue
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../webapp/resources/SchedulerTasksResource.java   |  4 +-
 .../helix/webapp/TestResetPartitionState.java      |  2 +-
 .../helix/common/caches/CurrentStateCache.java     |  2 +-
 .../helix/common/caches/InstanceMessagesCache.java |  4 +-
 .../helix/controller/GenericHelixController.java   |  2 +-
 .../dataproviders/BaseControllerDataProvider.java  |  2 +-
 .../stages/CurrentStateComputationStage.java       |  6 +-
 .../controller/stages/MessageGenerationPhase.java  |  2 +-
 .../stages/ResourceComputationStage.java           |  2 +-
 .../stages/TopStateHandoffReportStage.java         |  8 +--
 .../manager/zk/DistributedLeaderElection.java      | 19 +++---
 .../helix/manager/zk/ParticipantManager.java       | 21 +++++--
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  2 +-
 .../apache/helix/manager/zk/ZKHelixManager.java    |  4 +-
 .../helix/messaging/DefaultMessagingService.java   |  2 +-
 .../java/org/apache/helix/model/LiveInstance.java  | 30 ++++++++--
 .../org/apache/helix/spectator/RoutingTable.java   |  2 +-
 .../helix/spectator/RoutingTableProvider.java      |  2 +-
 .../apache/helix/util/InstanceValidationUtil.java  |  4 +-
 .../java/org/apache/helix/common/ZkTestBase.java   | 63 +++++++++++++++++---
 .../controller/stages/TestRebalancePipeline.java   | 31 +++++-----
 .../helix/integration/TestCleanupExternalView.java |  4 +-
 .../TestMessagePartitionStateMismatch.java         |  2 +-
 .../TestNoThrottleDisabledPartitions.java          |  6 +-
 .../helix/integration/TestResetPartitionState.java |  2 +-
 .../integration/TestSyncSessionToController.java   | 33 ++++++++++-
 .../messaging/TestP2PMessageSemiAuto.java          |  2 +-
 .../messaging/TestP2PNoDuplicatedMessage.java      |  4 +-
 .../paticipant/TestInstanceCurrentState.java       |  2 +-
 .../helix/manager/zk/TestHandleNewSession.java     | 68 ++++++++--------------
 .../helix/manager/zk/TestZkClusterManager.java     | 10 ++--
 .../messaging/p2pMessage/TestP2PMessages.java      |  6 +-
 .../rest/server/service/InstanceServiceImpl.java   |  2 +-
 33 files changed, 220 insertions(+), 135 deletions(-)

diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
index 5c4e44e..60a5268 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
@@ -95,7 +95,7 @@ public class SchedulerTasksResource extends ServerResource {
         ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
     LiveInstance liveInstance =
         accessor.getProperty(accessor.keyBuilder().liveInstance(instanceName));
-    String sessionId = liveInstance.getSessionId();
+    String sessionId = liveInstance.getEphemeralOwner();
 
     StringRepresentation representation = new StringRepresentation("");// (ClusterRepresentationUtil.ObjectToJson(instanceConfigs),
                                                                        // MediaType.APPLICATION_JSON);
@@ -136,7 +136,7 @@ public class SchedulerTasksResource extends ServerResource {
 
       schedulerMessage.getRecord().getMapFields().put(MESSAGETEMPLATE, messageTemplate);
 
-      schedulerMessage.setTgtSessionId(leader.getSessionId());
+      schedulerMessage.setTgtSessionId(leader.getEphemeralOwner());
       schedulerMessage.setTgtName("CONTROLLER");
       schedulerMessage.setSrcInstanceType(InstanceType.CONTROLLER);
       String taskQueueName =
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java
index 01c7d3d..8d994e8 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java
@@ -186,7 +186,7 @@ public class TestResetPartitionState extends AdminTestBase {
     Builder keyBuilder = accessor.keyBuilder();
 
     LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instance));
-    accessor.removeProperty(keyBuilder.stateTransitionStatus(instance, liveInstance.getSessionId(),
+    accessor.removeProperty(keyBuilder.stateTransitionStatus(instance, liveInstance.getEphemeralOwner(),
         resource, partition));
 
   }
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
index 38ac745..e584956 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
@@ -121,7 +121,7 @@ public class CurrentStateCache extends AbstractDataCache<CurrentState> {
     Set<PropertyKey> currentStateKeys = new HashSet<>();
     for (String instanceName : liveInstanceMap.keySet()) {
       LiveInstance liveInstance = liveInstanceMap.get(instanceName);
-      String sessionId = liveInstance.getSessionId();
+      String sessionId = liveInstance.getEphemeralOwner();
       List<String> currentStateNames =
           accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
       for (String currentStateName : currentStateNames) {
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index fb21141..21fba5b 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -304,7 +304,7 @@ public class InstanceMessagesCache {
       return;
     }
 
-    String instanceSessionId = liveInstanceMap.get(targetHost).getSessionId();
+    String instanceSessionId = liveInstanceMap.get(targetHost).getEphemeralOwner();
 
     // Target host's session has been changed, remove relay message
     if (!instanceSessionId.equals(sessionId)) {
@@ -370,7 +370,7 @@ public class InstanceMessagesCache {
       setMessageRelayTime(relayMessage, currentTime);
       return;
     }
-    String instanceSessionId = liveInstanceMap.get(relayInstance).getSessionId();
+    String instanceSessionId = liveInstanceMap.get(relayInstance).getEphemeralOwner();
     if (!instanceSessionId.equals(sessionId)) {
       LOG.info("Relay instance sessionId {} does not match sessionId {} in hosted message {}, "
               + "set relay message {} to be expired.", instanceSessionId, sessionId,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index a8f2872..6c4f98e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -980,7 +980,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     Map<String, LiveInstance> curSessions = new HashMap<>();
     for (LiveInstance liveInstance : liveInstances) {
       curInstances.put(liveInstance.getInstanceName(), liveInstance);
-      curSessions.put(liveInstance.getSessionId(), liveInstance);
+      curSessions.put(liveInstance.getEphemeralOwner(), liveInstance);
     }
 
     // TODO: remove the synchronization here once we move this update into dataCache.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 9402029..a45813e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -326,7 +326,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
           .logDebug(logger, getClusterEventId(), "LiveInstances: " + getLiveInstances().keySet());
       for (LiveInstance instance : getLiveInstances().values()) {
         LogUtil.logDebug(logger, getClusterEventId(),
-            "live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
+            "live instance: " + instance.getInstanceName() + " " + instance.getEphemeralOwner());
       }
       LogUtil.logDebug(logger, getClusterEventId(), "IdealStates: " + getIdealStates().keySet());
       LogUtil.logDebug(logger, getClusterEventId(),
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 6115465..d5c65cd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -56,7 +56,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
 
     for (LiveInstance instance : liveInstances.values()) {
       String instanceName = instance.getInstanceName();
-      String instanceSessionId = instance.getSessionId();
+      String instanceSessionId = instance.getEphemeralOwner();
 
       // update pending messages
       Map<String, Message> messages = cache.getMessages(instanceName);
@@ -76,7 +76,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       CurrentStateOutput currentStateOutput, Collection<Message> pendingRelayMessages,
       Map<String, Resource> resourceMap) {
     String instanceName = instance.getInstanceName();
-    String instanceSessionId = instance.getSessionId();
+    String instanceSessionId = instance.getEphemeralOwner();
 
     // update all pending messages
     for (Message message : pendingMessages) {
@@ -166,7 +166,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
   private void updateCurrentStates(LiveInstance instance, Collection<CurrentState> currentStates,
       CurrentStateOutput currentStateOutput, Map<String, Resource> resourceMap) {
     String instanceName = instance.getInstanceName();
-    String instanceSessionId = instance.getSessionId();
+    String instanceSessionId = instance.getEphemeralOwner();
 
     for (CurrentState currentState : currentStates) {
       if (!instanceSessionId.equals(currentState.getSessionId())) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index b93a447..196dfee 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -89,7 +89,7 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
     Map<String, String> sessionIdMap = new HashMap<>();
 
     for (LiveInstance liveInstance : liveInstances.values()) {
-      sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
+      sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getEphemeralOwner());
     }
     MessageOutput output = new MessageOutput();
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 5785117..359d836 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -104,7 +104,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
     if (availableInstances != null && availableInstances.size() > 0) {
       for (LiveInstance instance : availableInstances.values()) {
         String instanceName = instance.getInstanceName();
-        String clientSessionId = instance.getSessionId();
+        String clientSessionId = instance.getEphemeralOwner();
 
         Map<String, CurrentState> currentStateMap =
             cache.getCurrentState(instanceName, clientSessionId);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
index 6fbd36e..f2b49e5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
@@ -246,7 +246,7 @@ public class TopStateHandoffReportStage extends AbstractBaseStage {
 
     // Current state output generation logic guarantees that current top state instance
     // must be a live instance
-    String curTopStateSession = cache.getLiveInstances().get(curTopStateInstance).getSessionId();
+    String curTopStateSession = cache.getLiveInstances().get(curTopStateInstance).getEphemeralOwner();
     long endTime =
         cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
             .getEndTime(partition.getPartitionName());
@@ -260,7 +260,7 @@ public class TopStateHandoffReportStage extends AbstractBaseStage {
     // Make sure last top state instance has not bounced during cluster data cache refresh
     if (cache.getLiveInstances().containsKey(lastTopStateInstance)) {
       String lastTopStateSession =
-          cache.getLiveInstances().get(lastTopStateInstance).getSessionId();
+          cache.getLiveInstances().get(lastTopStateInstance).getEphemeralOwner();
       // We need this null check as there are test cases creating incomplete current state
       if (cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
           != null) {
@@ -363,7 +363,7 @@ public class TopStateHandoffReportStage extends AbstractBaseStage {
       Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
       if (liveInstances.containsKey(missingStateInstance)) {
         CurrentState currentState = cache.getCurrentState(missingStateInstance,
-            liveInstances.get(missingStateInstance).getSessionId()).get(resourceName);
+            liveInstances.get(missingStateInstance).getEphemeralOwner()).get(resourceName);
 
         if (currentState != null
             && currentState.getPreviousState(partition.getPartitionName()) != null && currentState
@@ -460,7 +460,7 @@ public class TopStateHandoffReportStage extends AbstractBaseStage {
     Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
     for (String instanceName : stateMap.keySet()) {
       CurrentState currentState =
-          cache.getCurrentState(instanceName, liveInstances.get(instanceName).getSessionId())
+          cache.getCurrentState(instanceName, liveInstances.get(instanceName).getEphemeralOwner())
               .get(resourceName);
       if (currentState.getState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
         if (currentState.getEndTime(partition.getPartitionName()) <= handOffEndTime) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index 9d9d522..179524f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -143,14 +143,14 @@ public class DistributedLeaderElection implements ControllerChangeListener {
 
     LiveInstance currentLeader = accessor.getProperty(keyBuilder.controllerLeader());
     if (currentLeader != null) {
-      String currentSession = currentLeader.getSessionId();
+      String currentSession = currentLeader.getEphemeralOwner();
       LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
           + currentLeader.getInstanceName() + ", leaderSessionId: " + currentSession);
-      if (currentSession != null && currentSession.equals(newLeader.getSessionId())) {
+      if (currentSession != null && currentSession.equals(newLeader.getEphemeralOwner())) {
         return true;
       } else {
         LOG.warn("The existing leader has a different session. Expected session Id: " + newLeader
-            .getSessionId());
+            .getEphemeralOwner());
       }
     }
     return false;
@@ -165,15 +165,12 @@ public class DistributedLeaderElection implements ControllerChangeListener {
 
     // Record a MaintenanceSignal history
     if (!accessor.getBaseDataAccessor().update(keyBuilder.controllerLeaderHistory().getPath(),
-        new DataUpdater<ZNRecord>() {
-          @Override
-          public ZNRecord update(ZNRecord oldRecord) {
-            if (oldRecord == null) {
-              oldRecord = new ZNRecord(PropertyType.HISTORY.toString());
-            }
-            return new ControllerHistory(oldRecord).updateHistory(clusterName, instanceName,
-                version);
+        oldRecord -> {
+          if (oldRecord == null) {
+            oldRecord = new ZNRecord(PropertyType.HISTORY.toString());
           }
+          return new ControllerHistory(oldRecord).updateHistory(clusterName, instanceName,
+              version);
         }, AccessOption.PERSISTENT)) {
       LOG.error("Failed to persist leader history to ZK!");
     }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 28641e0..76cb791 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -43,17 +43,17 @@ import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.data.Stat;
 
 /**
  * Class to handle all session related work for a participant.
@@ -110,6 +110,8 @@ public class ParticipantManager {
       callback.onPreConnect();
     }
 
+    // TODO create live instance node after all the init works done --JJ
+    // This will help to prevent controller from sending any message prematurely.
     createLiveInstance();
     carryOverPreviousCurrentState();
 
@@ -181,7 +183,7 @@ public class ParticipantManager {
       retry = false;
       try {
         _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord());
-        LOG.info("LiveInstance created, path: " + liveInstancePath + ", sessionId: " + liveInstance.getSessionId());
+        LOG.info("LiveInstance created, path: " + liveInstancePath + ", sessionId: " + liveInstance.getEphemeralOwner());
       } catch (ZkNodeExistsException e) {
         LOG.warn("found another instance with same instanceName: " + _instanceName + " in cluster "
             + _clusterName);
@@ -200,14 +202,14 @@ public class ParticipantManager {
              * update sessionId field in live-instance if necessary
              */
             LiveInstance curLiveInstance = new LiveInstance(record);
-            if (!curLiveInstance.getSessionId().equals(_sessionId)) {
+            if (!curLiveInstance.getEphemeralOwner().equals(_sessionId)) {
               /**
                * in last handle-new-session,
                * live-instance is created by new zkconnection with stale session-id inside
                * just update session-id field
                */
               LOG.info("overwriting session-id by ephemeralOwner: " + ephemeralOwner
-                  + ", old-sessionId: " + curLiveInstance.getSessionId() + ", new-sessionId: "
+                  + ", old-sessionId: " + curLiveInstance.getEphemeralOwner() + ", new-sessionId: "
                   + _sessionId);
 
               curLiveInstance.setSessionId(_sessionId);
@@ -239,7 +241,8 @@ public class ParticipantManager {
     if (retry) {
       try {
         _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord());
-        LOG.info("LiveInstance created, path: " + liveInstancePath + ", sessionId: " + liveInstance.getSessionId());
+        LOG.info("LiveInstance created, path: " + liveInstancePath + ", sessionId: " + liveInstance
+            .getEphemeralOwner());
       } catch (Exception e) {
         String errorMessage =
             "instance: " + _instanceName + " already has a live-instance in cluster "
@@ -252,6 +255,12 @@ public class ParticipantManager {
     ParticipantHistory history = getHistory();
     history.reportOnline(_sessionId, _manager.getVersion());
     persistHistory(history);
+
+    if (!liveInstance.getEphemeralOwner().equals(liveInstance.getSessionId())) {
+      LOG.warn(
+          "Session ID {} (Deprecated) in the znode does not match the Ephemeral Owner session ID {}. Will use the Ephemeral Owner session ID.",
+          liveInstance.getSessionId(), liveInstance.getEphemeralOwner());
+    }
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index dfb8ba8..74e041c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -542,7 +542,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     // check partition is in ERROR state
-    String sessionId = liveInstance.getSessionId();
+    String sessionId = liveInstance.getEphemeralOwner();
     CurrentState curState =
         accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName));
     for (String partitionName : resetPartitionNames) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 8ebf338..2fba83c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -871,7 +871,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
       if (leader != null) {
         String leaderName = leader.getInstanceName();
-        String sessionId = Long.toHexString(leader.getRecord().getEphemeralOwner());
+        String sessionId = leader.getEphemeralOwner();
         if (leaderName != null && leaderName.equals(_instanceName) && sessionId
             .equals(_sessionId)) {
           return true;
@@ -896,7 +896,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       String path = PropertyPathBuilder.propertyStore(_clusterName);
       String fallbackPath = String.format("/%s/%s", _clusterName, "HELIX_PROPERTYSTORE");
       _helixPropertyStore =
-          new AutoFallbackPropertyStore<>(new ZkBaseDataAccessor<ZNRecord>(_zkclient),
+          new AutoFallbackPropertyStore<>(new ZkBaseDataAccessor<>(_zkclient),
               path, fallbackPath);
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index 09efadc..84aeafb 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -189,7 +189,7 @@ public class DefaultMessagingService implements ClusterMessagingService {
         List<LiveInstance> liveInstances = targetDataAccessor.getChildValues(keyBuilder.liveInstances());
 
         for (LiveInstance liveInstance : liveInstances) {
-          sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
+          sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getEphemeralOwner());
         }
       }
       for (Map<String, String> map : matchedList) {
diff --git a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
index 185bfc5..2bf8e88 100644
--- a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
@@ -66,24 +66,44 @@ public class LiveInstance extends HelixProperty {
   }
 
   /**
-   * TODO Deprecate this method since session Id should always read from node stat. It should be readonly. -- JJ
-   * Set the session that this instance corresponds to
+   * Set the session that this instance corresponds to in the content of the LiveInstance node.
+   * Please only use this method for testing or logging purposes. The source of truth should be the node's ephemeral owner.
    * @param sessionId session identifier
    */
+  @Deprecated
   public void setSessionId(String sessionId) {
     _record.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), sessionId);
   }
 
   /**
-   * TODO Read the session Id directly from node stat. -- JJ
-   * Get the session that this instance corresponds to
+   * Get the session that this instance corresponds to from the content of the LiveInstance node.
+   * Please only use this method for testing or logging purposes. The source of truth should be the node's ephemeral owner.
    * @return session identifier
    */
+  @Deprecated
   public String getSessionId() {
     return _record.getSimpleField(LiveInstanceProperty.SESSION_ID.toString());
   }
 
   /**
+   * Get the ephemeral owner of the LiveInstance node.
+   *
+   * TODO This field should be "SessionId", we used getEphemeralOwner to avoid conflict with the existing method.
+   * Note that we cannot rename or remove the existing method for backward compatibility for now.
+   * Once the Deprecated method is removed, we shall change the name back to getSessionId.
+   *
+   * @return session identifier
+   */
+  public String getEphemeralOwner() {
+    long ephemeralOwner = _record.getEphemeralOwner();
+    if (ephemeralOwner <= 0) {
+      // For backward compatibility, if the ephemeral owner is empty in ZNRecord, still read from node content.
+      return _record.getSimpleField(LiveInstanceProperty.SESSION_ID.toString());
+    }
+    return Long.toHexString(ephemeralOwner);
+  }
+
+  /**
    * Get the name of this instance
    * @return the instance name
    */
@@ -171,7 +191,7 @@ public class LiveInstance extends HelixProperty {
 
   @Override
   public boolean isValid() {
-    if (getSessionId() == null) {
+    if (getEphemeralOwner() == null) {
       _logger.error("liveInstance does not have session id. id:" + _record.getId());
       return false;
     }
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
index 115131b..48c1fc3 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
@@ -114,7 +114,7 @@ class RoutingTable {
       }
       for (LiveInstance liveInstance : _liveInstances) {
         String instanceName = liveInstance.getInstanceName();
-        String sessionId = liveInstance.getSessionId();
+        String sessionId = liveInstance.getEphemeralOwner();
         InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
         if (instanceConfig == null) {
           logger.warn(
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 5338145..ee0e638 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -486,7 +486,7 @@ public class RoutingTableProvider
 
     Map<String, LiveInstance> curSessions = new HashMap<>();
     for (LiveInstance liveInstance : liveInstances) {
-      curSessions.put(liveInstance.getSessionId(), liveInstance);
+      curSessions.put(liveInstance.getEphemeralOwner(), liveInstance);
     }
 
     // Go though the live instance list and update CurrentState listeners
diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
index b373469..dfc775d 100644
--- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
@@ -102,7 +102,7 @@ public class InstanceValidationUtil {
     PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
     LiveInstance liveInstance = dataAccessor.getProperty(propertyKeyBuilder.liveInstance(instanceName));
     if (liveInstance != null) {
-      String sessionId = liveInstance.getSessionId();
+      String sessionId = liveInstance.getEphemeralOwner();
 
       List<String> resourceNames = dataAccessor.getChildNames(propertyKeyBuilder.currentStates(instanceName, sessionId));
       for (String resourceName : resourceNames) {
@@ -172,7 +172,7 @@ public class InstanceValidationUtil {
     PropertyKey liveInstanceKey = propertyKeyBuilder.liveInstance(instanceName);
     LiveInstance liveInstance = dataAccessor.getProperty(liveInstanceKey);
     if (liveInstance != null) {
-      String sessionId = liveInstance.getSessionId();
+      String sessionId = liveInstance.getEphemeralOwner();
 
       PropertyKey currentStatesKey = propertyKeyBuilder.currentStates(instanceName, sessionId);
       List<String> resourceNames = dataAccessor.getChildNames(currentStatesKey);
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 8160b08..03338b4 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -23,7 +23,9 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -86,6 +88,7 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.ITestContext;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeClass;
@@ -101,6 +104,8 @@ public class ZkTestBase {
   protected static BaseDataAccessor<ZNRecord> _baseAccessor;
   protected static MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
 
+  private Map<String, Map<String, HelixZkClient>> _liveInstanceOwners = new HashMap<>();
+
   public static final String ZK_ADDR = "localhost:2183";
   protected static final String CLUSTER_PREFIX = "CLUSTER";
   protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER";
@@ -156,7 +161,6 @@ public class ZkTestBase {
   @BeforeClass
   public void beforeClass() throws Exception {
     cleanupJMXObjects();
-
     // Giving each test some time to settle (such as gc pause, etc).
     // Note that this is the best effort we could make to stabilize tests, not a complete solution
     Runtime.getRuntime().gc();
@@ -634,18 +638,48 @@ public class ZkTestBase {
     return idealStates;
   }
 
-  protected void setupLiveInstances(String clusterName, int[] liveInstances) {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+  @AfterClass
+  public void cleanupLiveInstanceOwners() {
+    for (String cluster : _liveInstanceOwners.keySet()) {
+      Map<String, HelixZkClient> clientMap = _liveInstanceOwners.get(cluster);
+      for (HelixZkClient client : clientMap.values()) {
+        client.close();
+      }
+      clientMap.clear();
+    }
+    _liveInstanceOwners.clear();
+  }
+
+  protected List<LiveInstance> setupLiveInstances(String clusterName, int[] liveInstances) {
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+
+    List<LiveInstance> result = new ArrayList<>();
 
     for (int i = 0; i < liveInstances.length; i++) {
       String instance = "localhost_" + liveInstances[i];
+
+      _liveInstanceOwners.putIfAbsent(clusterName, new HashMap<>());
+      Map<String, HelixZkClient> clientMap = _liveInstanceOwners.get(clusterName);
+      clientMap.putIfAbsent(instance, DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig));
+      HelixZkClient client = clientMap.get(instance);
+
+          ZKHelixDataAccessor accessor =
+          new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(client));
+      Builder keyBuilder = accessor.keyBuilder();
+
       LiveInstance liveInstance = new LiveInstance(instance);
-      liveInstance.setSessionId("session_" + liveInstances[i]);
-      liveInstance.setHelixVersion("0.0.0");
+      // Keep setting the session id in the deprecated field for ensure the same behavior as a real participant.
+      // Note the participant is doing so for backward compatibility.
+      liveInstance.setSessionId(Long.toHexString(client.getSessionId()));
+      // Please refer to the version requirement here: helix-core/src/main/resources/cluster-manager-version.properties
+      // Ensuring version compatibility can avoid the warning message during test.
+      liveInstance.setHelixVersion("0.4");
       accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
+      result.add(accessor.getProperty(keyBuilder.liveInstance(instance)));
     }
+    return result;
   }
 
   protected void deleteLiveInstances(String clusterName) {
@@ -653,8 +687,21 @@ public class ZkTestBase {
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
+    Map<String, HelixZkClient> clientMap = _liveInstanceOwners.getOrDefault(clusterName, Collections.emptyMap());
+
     for (String liveInstance : accessor.getChildNames(keyBuilder.liveInstances())) {
-      accessor.removeProperty(keyBuilder.liveInstance(liveInstance));
+      ZKHelixDataAccessor dataAccessor =
+          new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+      dataAccessor.removeProperty(keyBuilder.liveInstance(liveInstance));
+
+      HelixZkClient client = clientMap.remove(liveInstance);
+      if (client != null) {
+        client.close();
+      }
+    }
+
+    if (clientMap.isEmpty()) {
+      _liveInstanceOwners.remove(clusterName);
     }
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index c3541af..d90c228 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -40,6 +40,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.testng.Assert;
@@ -71,7 +72,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupIdealState(clusterName, new int[] {
         0
     }, resourceGroups, 1, 1);
-    setupLiveInstances(clusterName, new int[] {
+    List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
         0
     });
     setupStateModel(clusterName);
@@ -92,7 +93,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceMessageDispatchStage());
 
     // round1: set node0 currentState to OFFLINE
-    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
+    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
         "OFFLINE");
 
     runPipeline(event, dataRefresh);
@@ -147,7 +148,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupInstances(clusterName, new int[] {
         0, 1
     });
-    setupLiveInstances(clusterName, new int[] {
+    List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
         0, 1
     });
 
@@ -169,9 +170,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     // Since controller's rebalancer pipeline will GC pending messages after timeout, and both hosts
     // update current states to SLAVE, controller will send out rebalance message to
     // have one host to become master
-    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
+    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
         "SLAVE", true);
-    setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", "session_1",
+    setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", liveInstances.get(1).getEphemeralOwner(),
         "SLAVE", true);
 
     // Controller has timeout > 1sec, so after 1s, controller should not have GCed message
@@ -182,7 +183,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     // After another purge delay, controller should cleanup messages and continue to rebalance
     Thread.sleep(msgPurgeDelay);
     // Manually trigger another rebalance by touching current state
-    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
+    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
         "SLAVE");
     Thread.sleep(1000);
 
@@ -196,11 +197,11 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     // round3: node0 changes state to master, but failed to delete message,
     // controller will clean it up
-    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
+    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
         "MASTER", true);
     Thread.sleep(msgPurgeDelay);
     // touch current state to trigger rebalance
-    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
+    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
         "MASTER", false);
     Thread.sleep(1000);
     Assert.assertTrue(accessor.getChildNames(keyBuilder.messages("localhost_0")).isEmpty());
@@ -259,7 +260,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupIdealState(clusterName, new int[] {
         0
     }, resourceGroups, 1, 1);
-    setupLiveInstances(clusterName, new int[] {
+    List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
         0
     });
     setupStateModel(clusterName);
@@ -280,7 +281,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceMessageDispatchStage());
 
     // round1: set node0 currentState to OFFLINE and node1 currentState to SLAVE
-    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
+    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
         "OFFLINE");
 
     runPipeline(event, dataRefresh);
@@ -355,7 +356,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupIdealState(clusterName, new int[] {
         0, 1
     }, resourceGroups, 1, 2);
-    setupLiveInstances(clusterName, new int[] {
+    List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
         1
     });
     setupStateModel(clusterName);
@@ -376,7 +377,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceMessageDispatchStage());
 
     // round1: set node1 currentState to SLAVE
-    setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", "session_1",
+    setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
         "SLAVE");
 
     runPipeline(event, dataRefresh);
@@ -433,7 +434,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupIdealState(clusterName, new int[] {
         0, 1
     }, resourceGroups, 1, 2);
-    setupLiveInstances(clusterName, new int[] {
+    List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
         0, 1
     });
     setupStateModel(clusterName);
@@ -457,9 +458,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     // Helix will try to switch the state of the two instances, but it should not be two MASTER at
     // the same time
     // so it should first transit M->S, then transit another instance S->M
-    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
+    setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
         "SLAVE");
-    setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", "session_1",
+    setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", liveInstances.get(1).getEphemeralOwner(),
         "MASTER");
 
     runPipeline(event, dataRefresh);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
index b08e3ef..751128a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
@@ -98,11 +98,11 @@ public class TestCleanupExternalView extends ZkUnitTestBase {
 
     // System.out.println("remove current-state");
     LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance("localhost_12918"));
-    accessor.removeProperty(keyBuilder.currentState("localhost_12918", liveInstance.getSessionId(),
+    accessor.removeProperty(keyBuilder.currentState("localhost_12918", liveInstance.getEphemeralOwner(),
         "TestDB0"));
     liveInstance = accessor.getProperty(keyBuilder.liveInstance("localhost_12919"));
     accessor.removeProperty(
-        keyBuilder.currentState("localhost_12919", liveInstance.getSessionId(), "TestDB0"));
+        keyBuilder.currentState("localhost_12919", liveInstance.getEphemeralOwner(), "TestDB0"));
 
     // re-enable controller shall remove orphan external-view
     // System.out.println("re-enabling controller");
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
index 1413eea..56b5ca0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
@@ -47,7 +47,7 @@ public class TestMessagePartitionStateMismatch extends ZkStandAloneCMTestBase {
         accessor.getChildValuesMap(accessor.keyBuilder().liveInstances());
 
     for (String instanceName : liveinstanceMap.keySet()) {
-      String sessionid = liveinstanceMap.get(instanceName).getSessionId();
+      String sessionid = liveinstanceMap.get(instanceName).getEphemeralOwner();
       for (String partition : ev.getPartitionSet()) {
         if (ev.getStateMap(partition).containsKey(instanceName)) {
           String uuid = UUID.randomUUID().toString();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
index fa0fe6e..50d789e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
@@ -87,7 +87,7 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
         _accessor.keyBuilder().liveInstance(_participants[0].getInstanceName());
     LiveInstance liveInstance = _accessor.getProperty(liveInstanceKey);
     if (liveInstance != null) {
-      String sessionId = liveInstance.getSessionId();
+      String sessionId = liveInstance.getEphemeralOwner();
       List<CurrentState> currentStates = _accessor.getChildValues(
           _accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId));
       for (CurrentState currentState : currentStates) {
@@ -141,7 +141,7 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
         _accessor.keyBuilder().liveInstance(_participants[0].getInstanceName());
     LiveInstance liveInstance = _accessor.getProperty(liveInstanceKey);
     if (liveInstance != null) {
-      String sessionId = liveInstance.getSessionId();
+      String sessionId = liveInstance.getEphemeralOwner();
       List<CurrentState> currentStates = _accessor.getChildValues(
           _accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId));
       for (CurrentState currentState : currentStates) {
@@ -200,7 +200,7 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
         _accessor.keyBuilder().liveInstance(_participants[0].getInstanceName());
     LiveInstance liveInstance = _accessor.getProperty(liveInstanceKey);
     if (liveInstance != null) {
-      String sessionId = liveInstance.getSessionId();
+      String sessionId = liveInstance.getEphemeralOwner();
       List<CurrentState> currentStates = _accessor.getChildValues(
           _accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId));
       for (CurrentState currentState : currentStates) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
index 06fbdd4..f9a6111 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
@@ -189,7 +189,7 @@ public class TestResetPartitionState extends ZkTestBase {
     Builder keyBuilder = accessor.keyBuilder();
 
     LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instance));
-    accessor.removeProperty(keyBuilder.stateTransitionStatus(instance, liveInstance.getSessionId(),
+    accessor.removeProperty(keyBuilder.stateTransitionStatus(instance, liveInstance.getEphemeralOwner(),
         resource, partition));
 
   }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSyncSessionToController.java b/helix-core/src/test/java/org/apache/helix/integration/TestSyncSessionToController.java
index 5d34a1c..322a185 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSyncSessionToController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSyncSessionToController.java
@@ -21,15 +21,20 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 import java.util.List;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.InstanceType;
-import org.apache.helix.MessageListener;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.api.listeners.MessageListener;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.Message;
@@ -80,7 +85,31 @@ public class TestSyncSessionToController extends ZkTestBase {
     data.getSimpleFields().put("SESSION_ID", "invalid-id");
     accessor.set(path, data, 2);
     Thread.sleep(2000);
-    Assert.assertTrue(mockMessageListener.isSessionSyncMessageSent());
+    // Since we always read the content from ephemeral nodes, sync message won't be sent
+    Assert.assertFalse(mockMessageListener.isSessionSyncMessageSent());
+
+    // Even after reconnect, session sync won't happen
+    ZkTestHelper.expireSession(participants[0].getZkClient());
+    Assert.assertFalse(mockMessageListener.isSessionSyncMessageSent());
+
+    // Inject an invalid session message to trigger sync message
+    PropertyKey messageKey = keyBuilder.message("localhost_12918", "Mocked Invalid Message");
+    Message msg = new Message(Message.MessageType.STATE_TRANSITION, "Mocked Invalid Message");
+    msg.setSrcName(controller.getInstanceName());
+    msg.setTgtSessionId("invalid-id");
+    msg.setMsgState(Message.MessageState.NEW);
+    msg.setMsgId("Mocked Invalid Message");
+    msg.setTgtName("localhost_12918");
+    msg.setPartitionName("foo");
+    msg.setResourceName("bar");
+    msg.setFromState("SLAVE");
+    msg.setToState("MASTER");
+    msg.setSrcSessionId(controller.getSessionId());
+    msg.setStateModelDef("MasterSlave");
+    msg.setStateModelFactoryName("DEFAULT");
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(clusterName, accessor);
+    dataAccessor.setProperty(messageKey, msg);
+    Assert.assertTrue(TestHelper.verify(() -> mockMessageListener.isSessionSyncMessageSent(), 1500));
 
     // Cleanup
     controller.syncStop();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
index cbe3504..9292e08 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
@@ -208,7 +208,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
     LiveInstance liveInstance = liveInstanceMap.get(instance);
 
     Map<String, CurrentState> currentStateMap =
-        dataCache.getCurrentState(instance, liveInstance.getSessionId());
+        dataCache.getCurrentState(instance, liveInstance.getEphemeralOwner());
     Assert.assertNotNull(currentStateMap);
     CurrentState currentState = currentStateMap.get(dbName);
     Assert.assertNotNull(currentState);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
index 7f0f361..8cbc74f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -188,7 +188,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
 
     for (LiveInstance instance : liveInstanceMap.values()) {
       Map<String, CurrentState> currentStateMap =
-          dataCache.getCurrentState(instance.getInstanceName(), instance.getSessionId());
+          dataCache.getCurrentState(instance.getInstanceName(), instance.getEphemeralOwner());
       Assert.assertNotNull(currentStateMap);
       for (CurrentState currentState : currentStateMap.values()) {
         for (String partition : currentState.getPartitionStateMap().keySet()) {
@@ -214,7 +214,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
 
     for (LiveInstance instance : liveInstanceMap.values()) {
       Map<String, CurrentState> currentStateMap =
-          dataCache.getCurrentState(instance.getInstanceName(), instance.getSessionId());
+          dataCache.getCurrentState(instance.getInstanceName(), instance.getEphemeralOwner());
       Assert.assertNotNull(currentStateMap);
       for (CurrentState currentState : currentStateMap.values()) {
         for (String partition : currentState.getPartitionStateMap().keySet()) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestInstanceCurrentState.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestInstanceCurrentState.java
index 3e60a83..f1d2dfc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestInstanceCurrentState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestInstanceCurrentState.java
@@ -43,7 +43,7 @@ public class TestInstanceCurrentState extends TaskTestBase {
     LiveInstance liveInstance =
         accessor.getProperty(accessor.keyBuilder().liveInstance(instanceName));
     CurrentState currentState = accessor.getProperty(accessor.keyBuilder()
-        .currentState(instanceName, liveInstance.getSessionId(), WorkflowGenerator.DEFAULT_TGT_DB));
+        .currentState(instanceName, liveInstance.getEphemeralOwner(), WorkflowGenerator.DEFAULT_TGT_DB));
     // Test start time should happen after test start time
     Assert.assertTrue(
         currentState.getStartTime(WorkflowGenerator.DEFAULT_TGT_DB + "_0") >= _testStartTime);
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
index a6ce06b..3f5d66f 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
@@ -105,13 +105,10 @@ public class TestHandleNewSession extends ZkTestBase {
     manager.connect();
 
     // Ensure the controller successfully acquired leadership.
-    Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
-      @Override
-      public boolean verify() {
-        LiveInstance liveInstance = accessor.getProperty(keyBuilder.controllerLeader());
-        return liveInstance != null && controllerName.equals(liveInstance.getInstanceName())
-            && manager.getSessionId().equals(liveInstance.getSessionId());
-      }
+    Assert.assertTrue(TestHelper.verify(() -> {
+      LiveInstance liveInstance = accessor.getProperty(keyBuilder.controllerLeader());
+      return liveInstance != null && controllerName.equals(liveInstance.getInstanceName())
+          && manager.getSessionId().equals(liveInstance.getEphemeralOwner());
     }, 1000));
     // Record the original connection info.
     final String originalSessionId = manager.getSessionId();
@@ -124,24 +121,17 @@ public class TestHandleNewSession extends ZkTestBase {
     accessor.removeProperty(keyBuilder.controllerLeader());
     // 3. expire the session and create a new session
     ZkTestHelper.asyncExpireSession(manager._zkclient);
-    Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
-      @Override
-      public boolean verify() {
-        return !((ZkClient) manager._zkclient).getConnection().getZookeeperState().isAlive();
-      }
-    }, 3000));
+    Assert.assertTrue(TestHelper.verify(
+        () -> !((ZkClient) manager._zkclient).getConnection().getZookeeperState().isAlive(), 3000));
     // 4. start processing event again
     ((ZkClient) manager._zkclient).getEventLock().unlock();
 
     // Wait until the ZkClient has got a new session, and the original leader node gone
-    Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
-      @Override
-      public boolean verify() {
-        try {
-          return !Long.toHexString(manager._zkclient.getSessionId()).equals(originalSessionId);
-        } catch (HelixException hex) {
-          return false;
-        }
+    Assert.assertTrue(TestHelper.verify(() -> {
+      try {
+        return !Long.toHexString(manager._zkclient.getSessionId()).equals(originalSessionId);
+      } catch (HelixException hex) {
+        return false;
       }
     }, 2000));
     // ensure that the manager has not process the new session event yet
@@ -151,34 +141,26 @@ public class TestHandleNewSession extends ZkTestBase {
     // Note that this is the expected behavior but NOT desired behavior. Ideally, the new node should
     // be created with the right session directly. We will need to improve this.
     // TODO We should recording session Id in the zk event so the stale events are discarded instead of processed. After this is done, there won't be invalid node.
-    Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
-      @Override
-      public boolean verify() {
-        // Newly created node should have a new creating time but with old session.
-        LiveInstance invalidLeaderNode = accessor.getProperty(keyBuilder.controllerLeader());
-        // node exist
-        if (invalidLeaderNode == null)
-          return false;
-        // node is newly created
-        if (invalidLeaderNode.getStat().getCreationTime() == originalCreationTime)
-          return false;
-        // node has the same session as the old one, so it's invalid
-        if (!invalidLeaderNode.getSessionId().equals(originalSessionId))
-          return false;
-        return true;
-      }
+    Assert.assertTrue(TestHelper.verify(() -> {
+      // Newly created node should have a new creating time but with old session.
+      LiveInstance invalidLeaderNode = accessor.getProperty(keyBuilder.controllerLeader());
+      // node exist
+      if (invalidLeaderNode == null)
+        return false;
+      // node is newly created
+      if (invalidLeaderNode.getStat().getCreationTime() == originalCreationTime)
+        return false;
+      // node has the same session as the old one, so it's invalid
+      if (!invalidLeaderNode.getSessionId().equals(originalSessionId))
+        return false;
+      return true;
     }, 2000));
     Assert.assertFalse(manager.isLeader());
 
     // 5. proceed the new session handling, so the manager will get the new session.
     manager.proceedNewSessionHandling();
     // Since the new session handling will re-create the leader node, a new valid node shall be created.
-    Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
-      @Override
-      public boolean verify() {
-        return manager.isLeader();
-      }
-    }, 1000));
+    Assert.assertTrue(TestHelper.verify(() -> manager.isLeader(), 1000));
 
     manager.disconnect();
     TestHelper.dropCluster(clusterName, _gZkClient);
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index 9445312..76c4479 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -193,7 +193,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
-    Assert.assertFalse(liveInstance.getSessionId().equals("value"));
+    Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
 
     MockParticipantManager manager2 =
@@ -208,9 +208,9 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
-    Assert.assertFalse(liveInstance.getSessionId().equals("value"));
+    Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
-    String sessionId = liveInstance.getSessionId();
+    String sessionId = liveInstance.getEphemeralOwner();
 
     ZkTestHelper.expireSession(manager2.getZkClient());
     Thread.sleep(1000);
@@ -219,9 +219,9 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
-    Assert.assertFalse(liveInstance.getSessionId().equals("value"));
+    Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
-    Assert.assertFalse(sessionId.equals(liveInstance.getSessionId()));
+    Assert.assertFalse(sessionId.equals(liveInstance.getEphemeralOwner()));
 
     manager.disconnect();
     manager2.disconnect();
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
index 115acd6..72f110c 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
@@ -217,7 +217,7 @@ public class TestP2PMessages extends BaseStageTest {
     // Old master (initialMaster) failed the M->S transition,
     // but has not forward p2p message to new master (secondMaster) yet.
     // Validate: Controller should ignore the ERROR partition and send S->M message to new master.
-    String session = _dataCache.getLiveInstances().get(_initialMaster).getSessionId();
+    String session = _dataCache.getLiveInstances().get(_initialMaster).getEphemeralOwner();
     PropertyKey currentStateKey =
         new PropertyKey.Builder(_clusterName).currentState(_initialMaster, session, _db);
     CurrentState currentState = accessor.getProperty(currentStateKey);
@@ -307,7 +307,7 @@ public class TestP2PMessages extends BaseStageTest {
   private void handleMessage(String instance, String resource) {
     PropertyKey propertyKey = new PropertyKey.Builder(_clusterName).messages(instance);
     List<Message> messages = accessor.getChildValues(propertyKey);
-    String session = _dataCache.getLiveInstances().get(instance).getSessionId();
+    String session = _dataCache.getLiveInstances().get(instance).getEphemeralOwner();
 
     for (Message m : messages) {
       if (m.getResourceName().equals(resource)) {
@@ -374,7 +374,7 @@ public class TestP2PMessages extends BaseStageTest {
       Map<String, String> partitionState = partitionStateMap.get(p);
       for (String instance : partitionState.keySet()) {
         String state = partitionState.get(instance);
-        String session = _liveInstanceMap.get(instance).getSessionId();
+        String session = _liveInstanceMap.get(instance).getEphemeralOwner();
         PropertyKey currentStateKey =
             new PropertyKey.Builder(_clusterName).currentState(instance, session, resource);
         CurrentState currentState = accessor.getProperty(currentStateKey);
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
index 310e2b9..90e32d3 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
@@ -91,7 +91,7 @@ public class InstanceServiceImpl implements InstanceService {
     }
     if (liveInstance != null) {
       instanceInfoBuilder.liveInstance(liveInstance.getRecord());
-      String sessionId = liveInstance.getSessionId();
+      String sessionId = liveInstance.getEphemeralOwner();
 
       List<String> resourceNames = _dataAccessor
           .getChildNames(_dataAccessor.keyBuilder().currentStates(instanceName, sessionId));