You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/01/12 02:26:46 UTC

[helix] branch rest0 created (now e1f13b8)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a change to branch rest0
in repository https://gitbox.apache.org/repos/asf/helix.git.


      at e1f13b8  Implement ZkAccessor

This branch includes the following new commits:

     new 3f85666  Fix handleNewSession creating ephemeral node with expired session (#642)
     new c23abb1  Add method to wait and return established session's ID (#677)
     new e1f13b8  Implement ZkAccessor

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[helix] 02/03: Add method to wait and return established session's ID (#677)

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch rest0
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c23abb1b567f733a62383dedfbc7e1d4f1353595
Author: Huizhi L <ih...@gmail.com>
AuthorDate: Thu Jan 9 15:30:44 2020 -0800

    Add method to wait and return established session's ID (#677)
    
    zkClient's getSessionId() could bring in session race condition: session A is connected in waitUntilConnected, but when zkClient.getSession() is called in zkHelixManager, session A might be already expired and so zkClient.getSession() gets session B. This session ID is critical for the firs time handling new session after zkclient is created in zkHelixManager.
    
    Solution: add a new method waitForEstablishedSession() to wait for SynCconnected state and return the session id before unlocking the eventLock.
    
    Change list:
    - Add a new method waitForEstablishedSession()
    - Add a unit test to cover the new method.
---
 .../apache/helix/manager/zk/ZKHelixManager.java    | 10 +++------
 .../helix/manager/zk/client/HelixZkClient.java     | 23 +++++++++++++++++++
 .../helix/manager/zk/zookeeper/ZkClient.java       | 26 +++++++++++++++++++---
 .../apache/helix/manager/zk/TestRawZkClient.java   | 20 +++++++++++++++++
 4 files changed, 69 insertions(+), 10 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 6fabca6..0d66af8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -699,19 +699,15 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     int retryCount = 0;
     while (retryCount < 3) {
       try {
-        // TODO: get session id from waitUntilConnected to avoid race condition
-        if (!_zkclient.waitUntilConnected(_connectionInitTimeout, TimeUnit.MILLISECONDS)) {
-          throw new ZkTimeoutException(
-              "Unable to connect to zookeeper server within timeout: " + _connectionInitTimeout
-                  + " ms.");
-        }
+        final long sessionId =
+            _zkclient.waitForEstablishedSession(_connectionInitTimeout, TimeUnit.MILLISECONDS);
         handleStateChanged(KeeperState.SyncConnected);
         /*
          * This listener is subscribed after SyncConnected and firing new session events,
          * which means this listener has not yet handled new session, so we have to handle new
          * session here just for this listener.
          */
-        handleNewSession(ZKUtil.toHexSessionId(_zkclient.getSessionId()));
+        handleNewSession(ZKUtil.toHexSessionId(sessionId));
         break;
       } catch (HelixException e) {
         LOG.error("fail to createClient.", e);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
index f5ed25a..5f58b69 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
@@ -6,6 +6,7 @@ import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
 import org.I0Itec.zkclient.serialize.SerializableSerializer;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.manager.zk.BasicZkSerializer;
@@ -182,6 +183,28 @@ public interface HelixZkClient {
   // ZK state control
   boolean waitUntilConnected(long time, TimeUnit timeUnit);
 
+  /**
+   * Waits for SyncConnected state and returns a valid session ID(non-zero). The implementation of
+   * this method should wait for SyncConnected state and ZK session to be established, and should
+   * guarantee the established session's ID is returned before keeper state changes.
+   *
+   * Please note: this default implementation may have race condition issue and return an unexpected
+   * session ID that is zero or another new session's ID. The default implementation is for backward
+   * compatibility purpose.
+   *
+   * @param timeout Max waiting time for connecting to ZK server.
+   * @param timeUnit Time unit for the timeout.
+   * @return A valid ZK session ID which is non-zero.
+   */
+  default long waitForEstablishedSession(long timeout, TimeUnit timeUnit) {
+    if (!waitUntilConnected(timeout, timeUnit)) {
+      throw new ZkTimeoutException(
+          "Failed to get established session because connecting to ZK server has timed out in "
+              + timeout + " " + timeUnit);
+    }
+    return getSessionId();
+  }
+
   String getServers();
 
   long getSessionId();
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 94bccbb..a28ea83 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -1368,15 +1368,29 @@ public class ZkClient implements Watcher {
     return _connection;
   }
 
+  public long waitForEstablishedSession(long timeout, TimeUnit timeUnit) {
+    validateCurrentThread();
+
+    acquireEventLock();
+    try {
+      if (!waitForKeeperState(KeeperState.SyncConnected, timeout, timeUnit)) {
+        throw new ZkTimeoutException("Waiting to be connected to ZK server has timed out.");
+      }
+      // Reading session ID before unlocking event lock is critical to guarantee the established
+      // session's ID won't change.
+      return getSessionId();
+    } finally {
+      getEventLock().unlock();
+    }
+  }
+
   public boolean waitUntilConnected(long time, TimeUnit timeUnit) throws ZkInterruptedException {
     return waitForKeeperState(KeeperState.SyncConnected, time, timeUnit);
   }
 
   public boolean waitForKeeperState(KeeperState keeperState, long time, TimeUnit timeUnit)
       throws ZkInterruptedException {
-    if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
-      throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
-    }
+    validateCurrentThread();
     Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));
 
     LOG.debug("Waiting for keeper state " + keeperState);
@@ -2136,4 +2150,10 @@ public class ZkClient implements Watcher {
       return _listener.hashCode();
     }
   }
+
+  private void validateCurrentThread() {
+    if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
+      throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
+    }
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index fd7bb3c..111bd7a 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -712,4 +712,24 @@ public class TestRawZkClient extends ZkUnitTestBase {
     // of its owner expires.
     _zkClient.delete(path);
   }
+
+  @Test
+  public void testWaitForEstablishedSession() {
+    ZkClient zkClient = new ZkClient(ZK_ADDR);
+    Assert.assertTrue(zkClient.waitForEstablishedSession(1, TimeUnit.SECONDS) != 0L);
+    TestHelper.stopZkServer(_zkServer);
+    Assert.assertTrue(zkClient.waitForKeeperState(KeeperState.Disconnected, 1, TimeUnit.SECONDS));
+
+    try {
+      zkClient.waitForEstablishedSession(3, TimeUnit.SECONDS);
+      Assert.fail("Connecting to zk server should time out and ZkTimeoutException is expected.");
+    } catch (ZkTimeoutException expected) {
+      // Because zk server is shutdown, zkClient should not connect to zk server and a
+      // ZkTimeoutException should be thrown.
+    }
+
+    zkClient.close();
+    // Recover zk server for later tests.
+    _zkServer.start();
+  }
 }


[helix] 03/03: Implement ZkAccessor

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch rest0
in repository https://gitbox.apache.org/repos/asf/helix.git

commit e1f13b81089637f92b7beb5c6bf9b521ec857a4a
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Sat Jan 11 18:25:55 2020 -0800

    Implement ZkAccessor
---
 .../org/apache/helix/rest/common/ServletType.java  | 12 +++-
 .../resources/zookeeper/ZooKeeperAccessor.java     | 82 +++++++++++++++-------
 .../helix/rest/server/TestZooKeeperAccessor.java   |  2 +-
 3 files changed, 68 insertions(+), 28 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/ServletType.java b/helix-rest/src/main/java/org/apache/helix/rest/common/ServletType.java
index bbff2d6..f068f95 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/common/ServletType.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/ServletType.java
@@ -21,21 +21,27 @@ package org.apache.helix.rest.common;
 
 import org.apache.helix.rest.server.resources.helix.AbstractHelixResource;
 import org.apache.helix.rest.server.resources.metadata.NamespacesAccessor;
+import org.apache.helix.rest.server.resources.zookeeper.ZooKeeperAccessor;
+
 
 public enum ServletType {
   /**
    * Servlet serving default API endpoints (/admin/v2/clusters/...)
    */
   DEFAULT_SERVLET(HelixRestNamespace.DEFAULT_NAMESPACE_PATH_SPEC,
-      new String[] { AbstractHelixResource.class.getPackage().getName(),
-          NamespacesAccessor.class.getPackage().getName()
+      new String[] {
+          AbstractHelixResource.class.getPackage().getName(),
+          NamespacesAccessor.class.getPackage().getName(),
+          ZooKeeperAccessor.class.getPackage().getName()
       }),
 
   /**
    * Servlet serving namespaced API endpoints (/admin/v2/namespaces/{namespaceName})
    */
   COMMON_SERVLET("/namespaces/%s/*",
-      new String[] { AbstractHelixResource.class.getPackage().getName(),
+      new String[] {
+          AbstractHelixResource.class.getPackage().getName(),
+          ZooKeeperAccessor.class.getPackage().getName()
       });
 
   private final String _servletPathSpecTemplate;
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/zookeeper/ZooKeeperAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/zookeeper/ZooKeeperAccessor.java
index 4752168..fc2abe0 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/zookeeper/ZooKeeperAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/zookeeper/ZooKeeperAccessor.java
@@ -19,24 +19,21 @@ package org.apache.helix.rest.server.resources.zookeeper;
  * under the License.
  */
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.helix.AccessOption;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.rest.common.ContextPropertyKeys;
 import org.apache.helix.rest.server.ServerContext;
 import org.apache.helix.rest.server.resources.AbstractResource;
-import org.apache.helix.rest.server.resources.helix.ClusterAccessor;
-import org.codehaus.jackson.node.ObjectNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,42 +45,68 @@ import org.slf4j.LoggerFactory;
 @Path("/zookeeper")
 public class ZooKeeperAccessor extends AbstractResource {
   private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperAccessor.class.getName());
+  private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
 
-  private ServerContext _serverContext =
-      (ServerContext) _application.getProperties().get(ContextPropertyKeys.SERVER_CONTEXT.name());
-  private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor =
-      _serverContext.getByteArrayZkBaseDataAccessor();
+  public enum ZooKeeperCommand {
+    exists, getBinaryData, getStringData, getChildren
+  }
 
   @GET
   @Path("{path: .+}")
-  public Response get(@PathParam("path") String path)
+  public Response get(@PathParam("path") String path, @QueryParam("command") String commandStr) {
+    ZooKeeperCommand cmd;
+    try {
+      cmd = ZooKeeperCommand.valueOf(commandStr);
+    } catch (Exception e) {
+      return badRequest("Invalid ZooKeeper command: " + commandStr);
+    }
+
+    // Lazily initialize ZkBaseDataAccessor
+    ServerContext _serverContext =
+        (ServerContext) _application.getProperties().get(ContextPropertyKeys.SERVER_CONTEXT.name());
+    _zkBaseDataAccessor = _serverContext.getByteArrayZkBaseDataAccessor();
+
+    // Need to prepend a "/" since JAX-RS regex removes it
+    path = "/" + path;
+    switch (cmd) {
+      case exists:
+        return exists(path);
+      case getBinaryData:
+        return getData(path, cmd);
+      case getStringData:
+        return getData(path, cmd);
+      case getChildren:
+        return getChildren(path);
+      default:
+        LOG.error("Unsupported command :" + commandStr);
+        return badRequest("Unsupported command :" + commandStr);
+    }
+  }
 
   /**
    * Checks if a ZNode exists in the given path.
    * @param path
    * @return true if a ZNode exists, false otherwise
    */
-  @GET
-  @Path("exists/{path: .+}")
-  public Response exists(@PathParam("path") String path) {
+  private Response exists(String path) {
     if (!isPathValid(path)) {
       String errMsg = "exists(): The given path {} is not a valid ZooKeeper path!" + path;
       LOG.error(errMsg);
       return badRequest(errMsg);
     }
 
-    boolean exists = _zkBaseDataAccessor.exists(path, AccessOption.PERSISTENT);
-    return JSONRepresentation(exists);
+    Map<String, Boolean> result = ImmutableMap.of(ZooKeeperCommand.exists.name(),
+        _zkBaseDataAccessor.exists(path, AccessOption.PERSISTENT));
+    return JSONRepresentation(result);
   }
 
   /**
    * Reads the given path from ZooKeeper and returns the binary data for the ZNode.
    * @param path
+   * @param command denotes whether return type should be binary or String
    * @return binary data in the ZNode
    */
-  @GET
-  @Path("getData/{path: .+}")
-  public Response getData(@PathParam("path") String path) {
+  private Response getData(String path, ZooKeeperCommand command) {
     if (!isPathValid(path)) {
       String errMsg = "getData(): The given path {} is not a valid ZooKeeper path!" + path;
       LOG.error(errMsg);
@@ -92,7 +115,19 @@ public class ZooKeeperAccessor extends AbstractResource {
 
     if (_zkBaseDataAccessor.exists(path, AccessOption.PERSISTENT)) {
       byte[] bytes = _zkBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-      return JSONRepresentation(bytes);
+      switch (command) {
+        case getBinaryData:
+          Map<String, byte[]> binaryResult =
+              ImmutableMap.of(ZooKeeperCommand.getBinaryData.name(), bytes);
+          return JSONRepresentation(binaryResult);
+        case getStringData:
+          Map<String, String> stringResult =
+              ImmutableMap.of(ZooKeeperCommand.getStringData.name(), new String(bytes));
+          return JSONRepresentation(stringResult);
+        default:
+          LOG.error("Unsupported command :" + command);
+          return badRequest("Unsupported command :" + command);
+      }
     } else {
       throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
           .entity(String.format("The ZNode at path %s does not exist", path)).build());
@@ -104,9 +139,7 @@ public class ZooKeeperAccessor extends AbstractResource {
    * @param path
    * @return list of child ZNodes
    */
-  @GET
-  @Path("getChildren/{path: .+}")
-  public Response getChildren(@PathParam("path") String path) {
+  private Response getChildren(String path) {
     if (!isPathValid(path)) {
       String errMsg = "getChildren(): The given path {} is not a valid ZooKeeper path!" + path;
       LOG.error(errMsg);
@@ -114,8 +147,9 @@ public class ZooKeeperAccessor extends AbstractResource {
     }
 
     if (_zkBaseDataAccessor.exists(path, AccessOption.PERSISTENT)) {
-      List<String> children = _zkBaseDataAccessor.getChildNames(path, AccessOption.PERSISTENT);
-      return JSONRepresentation(children);
+      Map<String, List<String>> result = ImmutableMap.of(ZooKeeperCommand.getChildren.name(),
+          _zkBaseDataAccessor.getChildNames(path, AccessOption.PERSISTENT));
+      return JSONRepresentation(result);
     } else {
       throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
           .entity(String.format("The ZNode at path %s does not exist", path)).build());
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestZooKeeperAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestZooKeeperAccessor.java
index a46f0fd..175f0b2 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestZooKeeperAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestZooKeeperAccessor.java
@@ -67,7 +67,7 @@ public class TestZooKeeperAccessor extends AbstractTestClass {
     Assert.assertTrue(_testBaseDataAccessor.create(path, content.getBytes(), AccessOption.PERSISTENT));
     Assert.assertTrue(_testBaseDataAccessor.exists(path, AccessOption.PERSISTENT));
 
-    String data = new JerseyUriRequestBuilder("zookeeper/exists{}").format(path)
+    String data = new JerseyUriRequestBuilder("zookeeper{}?command=exists").format(path)
         .isBodyReturnExpected(true).get(this);
 
     // Clean up


[helix] 01/03: Fix handleNewSession creating ephemeral node with expired session (#642)

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch rest0
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 3f856664a633781b2c618764487b300d646c7f72
Author: Huizhi L <ih...@gmail.com>
AuthorDate: Thu Jan 9 11:24:00 2020 -0800

    Fix handleNewSession creating ephemeral node with expired session (#642)
    
    There is zk session race condition in zk helix manager. Zk session may expire and change during handling new session. If the expected session is expired when handling new session, the ephemeral node should NOT be created.
    
    Change list:
    - Change API create() in ZkClient to accept zk sessionId as a parameter. Add a new public API createEphemeral(final String path, final Object data, final String sessionId) to create an ephemeral node by the expected zk session.
    - Fire new session events and all events(child/data change events) to right after the first SyncConnected state is received in zk client.
    - Filter out stale sessions in handling new session.
    - Add multiple unit tests to cover the cases.
---
 .../helix/manager/zk/ParticipantManager.java       | 109 ++++---
 .../apache/helix/manager/zk/ZKHelixManager.java    |  76 ++++-
 .../java/org/apache/helix/manager/zk/ZKUtil.java   |  10 +
 .../manager/zk/ZkSessionMismatchedException.java   |  18 +
 .../helix/manager/zk/client/HelixZkClient.java     |  14 +
 .../helix/manager/zk/zookeeper/ZkClient.java       | 323 +++++++++++++++++-
 .../test/java/org/apache/helix/ZkTestHelper.java   |   2 +-
 .../helix/manager/zk/TestHandleNewSession.java     | 362 ++++++++++++++++++++-
 .../apache/helix/manager/zk/TestRawZkClient.java   | 255 ++++++++++++++-
 9 files changed, 1079 insertions(+), 90 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 76cb791..2e10f8f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -66,7 +66,6 @@ public class ParticipantManager {
   final PropertyKey.Builder _keyBuilder;
   final String _clusterName;
   final String _instanceName;
-  final String _sessionId;
   final int _sessionTimeout;
   final ConfigAccessor _configAccessor;
   final InstanceType _instanceType;
@@ -77,14 +76,19 @@ public class ParticipantManager {
   final LiveInstanceInfoProvider _liveInstanceInfoProvider;
   final List<PreConnectCallback> _preConnectCallbacks;
 
+  // zk session id should be immutable after participant manager is created. This is to avoid
+  // session race condition when handling new session for the participant.
+  private final String _sessionId;
+
   public ParticipantManager(HelixManager manager, HelixZkClient zkclient, int sessionTimeout,
-      LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> preConnectCallbacks) {
+      LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> preConnectCallbacks,
+      final String sessionId) {
     _zkclient = zkclient;
     _manager = manager;
     _clusterName = manager.getClusterName();
     _instanceName = manager.getInstanceName();
     _keyBuilder = new PropertyKey.Builder(_clusterName);
-    _sessionId = manager.getSessionId();
+    _sessionId = sessionId;
     _sessionTimeout = sessionTimeout;
     _configAccessor = manager.getConfigAccessor();
     _instanceType = manager.getInstanceType();
@@ -97,10 +101,23 @@ public class ParticipantManager {
   }
 
   /**
-   * Handle new session for a participang.
+   * Handles a new session for a participant. The new session's id is passed in when participant
+   * manager is created, as it is required to prevent ephemeral node creation from session race
+   * condition: ephemeral node is created by an expired or unexpected session.
+   *
    * @throws Exception
    */
   public void handleNewSession() throws Exception {
+    // Check zk session of this participant is still valid.
+    // If not, skip handling new session for this participant.
+    final String zkClientHexSession = ZKUtil.toHexSessionId(_zkclient.getSessionId());
+    if (!zkClientHexSession.equals(_sessionId)) {
+      throw new HelixException(
+          "Failed to handle new session for participant. There is a session mismatch: "
+              + "participant manager session = " + _sessionId + ", zk client session = "
+              + zkClientHexSession);
+    }
+
     joinCluster();
 
     /**
@@ -112,6 +129,8 @@ public class ParticipantManager {
 
     // TODO create live instance node after all the init works done --JJ
     // This will help to prevent controller from sending any message prematurely.
+    // Live instance creation also checks if the expected session is valid or not. Live instance
+    // should not be created by an expired zk session.
     createLiveInstance();
     carryOverPreviousCurrentState();
 
@@ -183,10 +202,15 @@ public class ParticipantManager {
       retry = false;
       try {
         _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord());
-        LOG.info("LiveInstance created, path: " + liveInstancePath + ", sessionId: " + liveInstance.getEphemeralOwner());
+        LOG.info("LiveInstance created, path: {}, sessionId: {}", liveInstancePath,
+            liveInstance.getEphemeralOwner());
+      } catch (ZkSessionMismatchedException e) {
+        throw new HelixException(
+            "Failed to create live instance, path: " + liveInstancePath + ". Caused by: "
+                + e.getMessage());
       } catch (ZkNodeExistsException e) {
-        LOG.warn("found another instance with same instanceName: " + _instanceName + " in cluster "
-            + _clusterName);
+        LOG.warn("Found another instance with same instance name: {} in cluster: {}", _instanceName,
+            _clusterName);
 
         Stat stat = new Stat();
         ZNRecord record = _zkclient.readData(liveInstancePath, stat, true);
@@ -196,41 +220,20 @@ public class ParticipantManager {
            */
           retry = true;
         } else {
-          String ephemeralOwner = Long.toHexString(stat.getEphemeralOwner());
-          if (ephemeralOwner.equals(_sessionId)) {
-            /**
-             * update sessionId field in live-instance if necessary
-             */
-            LiveInstance curLiveInstance = new LiveInstance(record);
-            if (!curLiveInstance.getEphemeralOwner().equals(_sessionId)) {
-              /**
-               * in last handle-new-session,
-               * live-instance is created by new zkconnection with stale session-id inside
-               * just update session-id field
-               */
-              LOG.info("overwriting session-id by ephemeralOwner: " + ephemeralOwner
-                  + ", old-sessionId: " + curLiveInstance.getEphemeralOwner() + ", new-sessionId: "
-                  + _sessionId);
-
-              curLiveInstance.setSessionId(_sessionId);
-              _zkclient.writeData(liveInstancePath, curLiveInstance.getRecord());
-            }
-          } else {
-            /**
-             * wait for a while, in case previous helix-participant exits unexpectedly
-             * and its live-instance still hangs around until session timeout
-             */
-            try {
-              TimeUnit.MILLISECONDS.sleep(_sessionTimeout + 5000);
-            } catch (InterruptedException ex) {
-              LOG.warn("Sleep interrupted while waiting for previous live-instance to go away.", ex);
-            }
-            /**
-             * give a last try after exit while loop
-             */
-            retry = true;
-            break;
+          /**
+           * wait for a while, in case previous helix-participant exits unexpectedly
+           * and its live-instance still hangs around until session timeout
+           */
+          try {
+            TimeUnit.MILLISECONDS.sleep(_sessionTimeout + 5000);
+          } catch (InterruptedException ex) {
+            LOG.warn("Sleep interrupted while waiting for previous live-instance to go away.", ex);
           }
+          /**
+           * give a last try after exit while loop
+           */
+          retry = true;
+          break;
         }
       }
     } while (retry);
@@ -241,26 +244,24 @@ public class ParticipantManager {
     if (retry) {
       try {
         _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord());
-        LOG.info("LiveInstance created, path: " + liveInstancePath + ", sessionId: " + liveInstance
-            .getEphemeralOwner());
+        LOG.info("LiveInstance created, path: {}, sessionId: {}", liveInstancePath,
+            liveInstance.getEphemeralOwner());
+      } catch (ZkSessionMismatchedException e) {
+        throw new HelixException(
+            "Failed to create live instance, path: " + liveInstancePath + ". Caused by: "
+                + e.getMessage());
+      } catch (ZkNodeExistsException e) {
+        throw new HelixException("Failed to create live instance because instance: " + _instanceName
+            + " already has a live-instance in cluster: " + _clusterName + ". Path is: "
+            + liveInstancePath);
       } catch (Exception e) {
-        String errorMessage =
-            "instance: " + _instanceName + " already has a live-instance in cluster "
-                + _clusterName;
-        LOG.error(errorMessage);
-        throw new HelixException(errorMessage);
+        throw new HelixException("Failed to create live instance. " + e.getMessage());
       }
     }
 
     ParticipantHistory history = getHistory();
     history.reportOnline(_sessionId, _manager.getVersion());
     persistHistory(history);
-
-    if (!liveInstance.getEphemeralOwner().equals(liveInstance.getSessionId())) {
-      LOG.warn(
-          "Session ID {} (Deprecated) in the znode does not match the Ephemeral Owner session ID {}. Will use the Ephemeral Owner session ID.",
-          liveInstance.getSessionId(), liveInstance.getEphemeralOwner());
-    }
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index df2dccc..6fabca6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -32,6 +32,7 @@ import javax.management.JMException;
 
 import com.google.common.collect.Sets;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
@@ -88,6 +89,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class ZKHelixManager implements HelixManager, IZkStateListener {
   private static Logger LOG = LoggerFactory.getLogger(ZKHelixManager.class);
 
@@ -697,10 +699,19 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     int retryCount = 0;
     while (retryCount < 3) {
       try {
-        // TODO: synchronize this block and wait for the new non-zero session ID updated.
-        _zkclient.waitUntilConnected(_connectionInitTimeout, TimeUnit.MILLISECONDS);
+        // TODO: get session id from waitUntilConnected to avoid race condition
+        if (!_zkclient.waitUntilConnected(_connectionInitTimeout, TimeUnit.MILLISECONDS)) {
+          throw new ZkTimeoutException(
+              "Unable to connect to zookeeper server within timeout: " + _connectionInitTimeout
+                  + " ms.");
+        }
         handleStateChanged(KeeperState.SyncConnected);
-        handleNewSession();
+        /*
+         * This listener is subscribed after SyncConnected and firing new session events,
+         * which means this listener has not yet handled new session, so we have to handle new
+         * session here just for this listener.
+         */
+        handleNewSession(ZKUtil.toHexSessionId(_zkclient.getSessionId()));
         break;
       } catch (HelixException e) {
         LOG.error("fail to createClient.", e);
@@ -823,6 +834,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   @Override
   public String getSessionId() {
     checkConnected(_waitForConnectedTimeout);
+    // TODO: session id should be updated after zk client is connected.
+    // Otherwise, this session id might be an expired one.
     return _sessionId;
   }
 
@@ -980,7 +993,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         continue;
       }
 
-      _sessionId = Long.toHexString(_zkclient.getSessionId());
+      _sessionId = ZKUtil.toHexSessionId(_zkclient.getSessionId());
 
       /**
        * at the time we read session-id, zkconnection might be lost again
@@ -1099,7 +1112,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   }
 
   /**
-   * Called after the zookeeper session has expired and a new session has been created. This method
+   * Called after zookeeper session has expired and a new session has been established. This method
    * may cause session race condition when creating ephemeral nodes. Internally, this method calls
    * {@link #handleNewSession(String)} with a null value as the sessionId parameter, which results
    * in later creating the ephemeral node in the session of the latest zk connection.
@@ -1121,10 +1134,22 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     handleNewSession(null);
   }
 
+  /**
+   * Called after the zookeeper session has expired and a new session has been established. This
+   * methods handles a new session with its session id passed in. Before handling, this method
+   * waits until zk client is connected to zk service and gets a non-zero session id(current actual
+   * session id). If the passed-in(expected) session id does not match current actual session id,
+   * the expected session id is expired and will NOT be handled.
+   *
+   * @param sessionId the new session's id. The ephemeral nodes are expected to be created in this
+   *                  session. If this session id is expired, ephemeral nodes should not be created.
+   * @throws Exception if any error occurs during handling new session
+   */
   @Override
-  public void handleNewSession(final String sessionId) throws Exception {
+  public void handleNewSession(String sessionId) throws Exception {
     /*
-     * TODO: after removing I0ItecIZkStateListenerHelixImpl, null session should be checked and discarded.
+     * TODO: after removing I0ItecIZkStateListenerHelixImpl, null session should be checked and
+     *  discarded.
      * Null session is still a special case here, which is treated as non-session aware operation.
      * This special case could still potentially cause race condition, so null session should NOT
      * be acceptable, once I0ItecIZkStateListenerHelixImpl is removed. Currently this special case
@@ -1134,9 +1159,31 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     // Wait until we get a non-zero session id. Otherwise, getSessionId() might be null.
     waitUntilConnected();
 
-    // TODO: filter out stale sessions here.
+    /*
+     * Filter out stale sessions. If a session id is not null and not the same as current session
+     * id, this session is expired. With this filtering, expired sessions are NOT handled,
+     * so performance is expected to improve.
+     */
+    if (sessionId != null && !getSessionId().equals(sessionId)) {
+      LOG.warn("Session is expired and not handled. Expected: {}. Actual: {}.", sessionId,
+          getSessionId());
+      return;
+    }
+
+    /*
+     * When a null session id is passed in, we will take current session's id for following
+     * operations. Please note that current session might not be the one we expect to handle,
+     * because the one we expect might be already expired when the zk event is waiting in the
+     * event queue. Why we use current session here is for backward compatibility with the old
+     * method handleNewSession().
+     */
+    if (sessionId == null) {
+      sessionId = getSessionId();
+      LOG.debug("Session id: <null> is passed in. Current session id: {} will be used.", sessionId);
+    }
+
     LOG.info("Handle new session, instance: {}, type: {}, session id: {}.", _instanceName,
-        _instanceType, sessionId == null ? "None" : sessionId);
+        _instanceType,  sessionId);
 
     /**
      * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session
@@ -1164,13 +1211,13 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
     switch (_instanceType) {
     case PARTICIPANT:
-      handleNewSessionAsParticipant();
+      handleNewSessionAsParticipant(sessionId);
       break;
     case CONTROLLER:
       handleNewSessionAsController();
       break;
     case CONTROLLER_PARTICIPANT:
-      handleNewSessionAsParticipant();
+      handleNewSessionAsParticipant(sessionId);
       handleNewSessionAsController();
       break;
     case ADMINISTRATOR:
@@ -1197,16 +1244,19 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     }
   }
 
-  void handleNewSessionAsParticipant() throws Exception {
+  void handleNewSessionAsParticipant(final String sessionId) throws Exception {
     if (_participantManager != null) {
       _participantManager.reset();
     }
     _participantManager =
         new ParticipantManager(this, _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
-            _preConnectCallbacks);
+            _preConnectCallbacks, sessionId);
+
     _participantManager.handleNewSession();
   }
 
+  // TODO: pass in session id and make this method session aware to avoid potential session race
+  //  condition.
   void handleNewSessionAsController() {
     if (_leaderElectionHandler != null) {
       _leaderElectionHandler.init();
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index bd98c15..821af52 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -581,6 +581,16 @@ public final class ZKUtil {
   }
 
   /**
+   * Converts a session id in hexadecimal notation from a long type session id.
+   * Ex. 1000a5ceb930004 is returned.
+   *
+   * @return String representation of session id in hexadecimal notation.
+   */
+  public static String toHexSessionId(long sessionId) {
+    return Long.toHexString(sessionId);
+  }
+
+  /**
    * Returns a dedicated ZkClient.
    * @return
    */
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkSessionMismatchedException.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkSessionMismatchedException.java
new file mode 100644
index 0000000..e336bb6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkSessionMismatchedException.java
@@ -0,0 +1,18 @@
+package org.apache.helix.manager.zk;
+
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.zookeeper.KeeperException;
+
+
+/**
+ * Exception thrown when an action is taken by an expected zk session which
+ * does not match the actual zk session.
+ */
+public class ZkSessionMismatchedException extends ZkException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ZkSessionMismatchedException(String message) {
+        super(message);
+    }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
index d735874..f5ed25a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
@@ -97,20 +97,34 @@ public interface HelixZkClient {
 
   void createEphemeral(final String path);
 
+  void createEphemeral(final String path, final String sessionId);
+
   void createEphemeral(final String path, final List<ACL> acl);
 
+  void createEphemeral(final String path, final List<ACL> acl, final String sessionId);
+
   String create(final String path, Object data, final CreateMode mode);
 
   String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode);
 
   void createEphemeral(final String path, final Object data);
 
+  void createEphemeral(final String path, final Object data, final String sessionId);
+
   void createEphemeral(final String path, final Object data, final List<ACL> acl);
 
+  void createEphemeral(final String path, final Object data, final List<ACL> acl,
+      final String sessionId);
+
   String createEphemeralSequential(final String path, final Object data);
 
   String createEphemeralSequential(final String path, final Object data, final List<ACL> acl);
 
+  String createEphemeralSequential(final String path, final Object data, final String sessionId);
+
+  String createEphemeralSequential(final String path, final Object data, final List<ACL> acl,
+      final String sessionId);
+
   List<String> getChildren(String path);
 
   int countChildren(String path);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index e035d14..94bccbb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -42,7 +42,9 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.manager.zk.BasicZkSerializer;
 import org.apache.helix.manager.zk.PathBasedZkSerializer;
+import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks;
+import org.apache.helix.manager.zk.ZkSessionMismatchedException;
 import org.apache.helix.manager.zk.zookeeper.ZkEventThread.ZkEvent;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
 import org.apache.helix.util.ExponentialBackoffStrategy;
@@ -63,6 +65,7 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on
  * nodes in ZooKeeper.
@@ -80,6 +83,16 @@ public class ZkClient implements Watcher {
   private final Set<IZkStateListener> _stateListener = new CopyOnWriteArraySet<>();
   private KeeperState _currentState;
   private final ZkLock _zkEventLock = new ZkLock();
+
+  // When a new zookeeper instance is created in reconnect, its session id is not yet valid before
+  // the zookeeper session is established(SyncConnected). To avoid session race condition in
+  // handling new session, the new session event is only fired after SyncConnected. Meanwhile,
+  // SyncConnected state is also received when re-opening the zk connection. So to avoid firing
+  // new session event more than once, this flag is used to check.
+  // It is set to false right after the new zookeeper instance is created in reconnect before the
+  // session is established. And set it to true once the new session event is fired the first time.
+  private boolean _isNewSessionEventFired;
+
   private boolean _shutdownTriggered;
   private ZkEventThread _eventThread;
   // TODO PVo remove this later
@@ -173,6 +186,7 @@ public class ZkClient implements Watcher {
     _connection = zkConnection;
     _pathBasedZkSerializer = zkSerializer;
     _operationRetryTimeoutInMillis = operationRetryTimeout;
+    _isNewSessionEventFired = false;
 
     connect(connectionTimeout, this);
 
@@ -493,6 +507,28 @@ public class ZkClient implements Watcher {
   }
 
   /**
+   * Creates an ephemeral node. This ephemeral node is created by the expected(passed-in) ZK session.
+   * If the expected session does not match the current ZK session, the node will not be created.
+   *
+   * @param path path of the node
+   * @param sessionId expected session id of the ZK connection. If the session id of current ZK
+   *                  connection does not match the expected session id, ephemeral creation will
+   *                  fail
+   * @throws ZkInterruptedException
+   *           if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *           if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *           if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *           if any other exception occurs
+   */
+  public void createEphemeral(final String path, final String sessionId)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    createEphemeral(path, null, sessionId);
+  }
+
+  /**
    * Create an ephemeral node and set its ACL.
    * @param path
    * @param acl
@@ -511,6 +547,30 @@ public class ZkClient implements Watcher {
   }
 
   /**
+   * Creates an ephemeral node and set its ACL. This ephemeral node is created by the
+   * expected(passed-in) ZK session. If the expected session does not match the current ZK session,
+   * the node will not be created.
+   *
+   * @param path path of the ephemeral node
+   * @param acl a list of ACL for the ephemeral node.
+   * @param sessionId expected session id of the ZK connection. If the session id of current ZK
+   *                  connection does not match the expected session id, ephemeral creation will
+   *                  fail.
+   * @throws ZkInterruptedException
+   *           if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *           if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *           if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *           if any other exception occurs
+   */
+  public void createEphemeral(final String path, final List<ACL> acl, final String sessionId)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    create(path, null, acl, CreateMode.EPHEMERAL, sessionId);
+  }
+
+  /**
    * Create a node.
    * @param path
    * @param data
@@ -548,6 +608,34 @@ public class ZkClient implements Watcher {
    */
   public String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode)
       throws IllegalArgumentException, ZkException {
+    return create(path, datat, acl, mode, null);
+  }
+
+  /**
+   * Creates a node and returns the actual path of the created node.
+   *
+   * Given an expected non-null session id, if the node is successfully created, it is guaranteed to
+   * be created in the expected(passed-in) session.
+   *
+   * If the expected session is expired, which means the expected session does not match the current
+   * session of ZK connection, the node will not be created.
+   *
+   * @param path the path where you want the node to be created
+   * @param dataObject data of the node
+   * @param acl list of ACL for the node
+   * @param mode {@link CreateMode} of the node
+   * @param expectedSessionId the expected session ID of the ZK connection. It is not necessarily the
+   *                  session ID of current ZK Connection. If the expected session ID is NOT null,
+   *                  the node is guaranteed to be created in the expected session, or creation is
+   *                  failed if the expected session id doesn't match current connected zk session.
+   *                  If the session id is null, it means the create operation is NOT session aware.
+   * @return path of the node created
+   * @throws IllegalArgumentException if called from anything else except the ZooKeeper event thread
+   * @throws ZkException if any zookeeper exception occurs
+   */
+  private String create(final String path, final Object dataObject, final List<ACL> acl,
+      final CreateMode mode, final String expectedSessionId)
+      throws IllegalArgumentException, ZkException {
     if (path == null) {
       throw new NullPointerException("Path must not be null.");
     }
@@ -556,15 +644,46 @@ public class ZkClient implements Watcher {
     }
     long startT = System.currentTimeMillis();
     try {
-      final byte[] data = datat == null ? null : serialize(datat, path);
-      checkDataSizeLimit(data);
-      String actualPath = retryUntilConnected(new Callable<String>() {
-        @Override
-        public String call() throws Exception {
-          return getConnection().create(path, data, acl, mode);
+      final byte[] dataBytes = dataObject == null ? null : serialize(dataObject, path);
+      checkDataSizeLimit(dataBytes);
+
+      final String actualPath = retryUntilConnected(() -> {
+        ZooKeeper zooKeeper = ((ZkConnection) getConnection()).getZookeeper();
+
+        /*
+         * 1. If operation is session aware, we have to check whether or not the
+         * passed-in(expected) session id matches actual session's id.
+         * If not, ephemeral node creation is failed. This validation is
+         * critical to guarantee the ephemeral node created by the expected ZK session.
+         *
+         * 2. Otherwise, the operation is NOT session aware.
+         * In this case, we will use the actual zookeeper session to create the node.
+         */
+        if (isSessionAwareOperation(expectedSessionId, mode)) {
+          acquireEventLock();
+          try {
+            final String actualSessionId = ZKUtil.toHexSessionId(zooKeeper.getSessionId());
+            if (!actualSessionId.equals(expectedSessionId)) {
+              throw new ZkSessionMismatchedException(
+                  "Failed to create ephemeral node! There is a session id mismatch. Expected: "
+                      + expectedSessionId + ". Actual: " + actualSessionId);
+            }
+
+            /*
+             * Cache the zookeeper reference and make sure later zooKeeper.create() is being run
+             * under this zookeeper connection. This is to avoid locking zooKeeper.create() which
+             * may cause potential performance issue.
+             */
+            zooKeeper = ((ZkConnection) getConnection()).getZookeeper();
+          } finally {
+            getEventLock().unlock();
+          }
         }
+
+        return zooKeeper.create(path, dataBytes, acl, mode);
       });
-      record(path, data, startT, ZkClientMonitor.AccessType.WRITE);
+
+      record(path, dataBytes, startT, ZkClientMonitor.AccessType.WRITE);
       return actualPath;
     } catch (Exception e) {
       recordFailure(path, ZkClientMonitor.AccessType.WRITE);
@@ -596,6 +715,33 @@ public class ZkClient implements Watcher {
   }
 
   /**
+   * Creates an ephemeral node. Given an expected non-null session id, if the ephemeral
+   * node is successfully created, it is guaranteed to be in the expected(passed-in) session.
+   *
+   * If the expected session is expired, which means the expected session does not match the session
+   * of current ZK connection, the ephemeral node will not be created.
+   * If connection is timed out or interrupted, exception is thrown.
+   *
+   * @param path path of the ephemeral node being created
+   * @param data data of the ephemeral node being created
+   * @param sessionId the expected session ID of the ZK connection. It is not necessarily the
+   *                  session ID of current ZK Connection. If the expected session ID is NOT null,
+   *                  the node is guaranteed to be created in the expected session, or creation is
+   *                  failed if the expected session id doesn't match current connected zk session.
+   *                  If the session id is null, it means the operation is NOT session aware
+   *                  and the node will be created by current ZK session.
+   * @throws ZkInterruptedException if operation is interrupted, or a required reconnection gets
+   *         interrupted
+   * @throws IllegalArgumentException if called from anything except the ZooKeeper event thread
+   * @throws ZkException if any ZooKeeper exception occurs
+   * @throws RuntimeException if any other exception occurs
+   */
+  public void createEphemeral(final String path, final Object data, final String sessionId)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, sessionId);
+  }
+
+  /**
    * Create an ephemeral node.
    * @param path
    * @param data
@@ -615,6 +761,37 @@ public class ZkClient implements Watcher {
   }
 
   /**
+   * Creates an ephemeral node in an expected ZK session. Given an expected non-null session id,
+   * if the ephemeral node is successfully created, it is guaranteed to be in the expected session.
+   * If the expected session is expired, which means the expected session does not match the session
+   * of current ZK connection, the ephemeral node will not be created.
+   * If connection is timed out or interrupted, exception is thrown.
+   *
+   * @param path path of the ephemeral node being created
+   * @param data data of the ephemeral node being created
+   * @param acl list of ACL for the ephemeral node
+   * @param sessionId the expected session ID of the ZK connection. It is not necessarily the
+   *                  session ID of current ZK Connection. If the expected session ID is NOT null,
+   *                  the node is guaranteed to be created in the expected session, or creation is
+   *                  failed if the expected session id doesn't match current connected zk session.
+   *                  If the session id is null, it means the create operation is NOT session aware
+   *                  and the node will be created by current ZK session.
+   * @throws ZkInterruptedException
+   *           if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *           if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *           if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *           if any other exception occurs
+   */
+  public void createEphemeral(final String path, final Object data, final List<ACL> acl,
+      final String sessionId)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    create(path, data, acl, CreateMode.EPHEMERAL, sessionId);
+  }
+
+  /**
    * Create an ephemeral, sequential node.
    * @param path
    * @param data
@@ -634,6 +811,65 @@ public class ZkClient implements Watcher {
   }
 
   /**
+   * Creates an ephemeral, sequential node with ACL in an expected ZK session.
+   * Given an expected non-null session id, if the ephemeral node is successfully created,
+   * it is guaranteed to be in the expected session.
+   * If the expected session is expired, which means the expected session does not match the session
+   * of current ZK connection, the ephemeral node will not be created.
+   * If connection is timed out or interrupted, exception is thrown.
+   *
+   * @param path path of the node
+   * @param data data of the node
+   * @param acl list of ACL for the node
+   * @return created path
+   * @throws ZkInterruptedException
+   *           if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *           if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *           if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *           if any other exception occurs
+   */
+  public String createEphemeralSequential(final String path, final Object data, final List<ACL> acl,
+      final String sessionId)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, sessionId);
+  }
+
+  /**
+   * Creates an ephemeral, sequential node. Given an expected non-null session id,
+   * if the ephemeral node is successfully created, it is guaranteed to be in the expected session.
+   * If the expected session is expired, which means the expected session does not match the session
+   * of current ZK connection, the ephemeral node will not be created.
+   * If connection is timed out or interrupted, exception is thrown.
+   *
+   * @param path path of the node
+   * @param data data of the node
+   * @param sessionId the expected session ID of the ZK connection. It is not necessarily the
+   *                  session ID of current ZK Connection. If the expected session ID is NOT null,
+   *                  the node is guaranteed to be created in the expected session, or creation is
+   *                  failed if the expected session id doesn't match current connected zk session.
+   *                  If the session id is null, it means the create operation is NOT session aware
+   *                  and the node will be created by current ZK session.
+   * @return created path
+   * @throws ZkInterruptedException
+   *           if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *           if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *           if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *           if any other exception occurs
+   */
+  public String createEphemeralSequential(final String path, final Object data,
+      final String sessionId)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
+        sessionId);
+  }
+
+  /**
    * Create an ephemeral, sequential node with ACL.
    * @param path
    * @param data
@@ -703,8 +939,6 @@ public class ZkClient implements Watcher {
         if (event.getState() == KeeperState.Expired) {
           getEventLock().getZNodeEventCondition().signalAll();
           getEventLock().getDataChangedCondition().signalAll();
-          // We also have to notify all listeners that something might have changed
-          fireAllEvents();
         }
       }
       if (znodeChanged) {
@@ -835,8 +1069,37 @@ public class ZkClient implements Watcher {
     if (getShutdownTrigger()) {
       return;
     }
+
     fireStateChangedEvent(event.getState());
-    if (isManagingZkConnection() && event.getState() == KeeperState.Expired) {
+
+    if (!isManagingZkConnection()) {
+      return;
+    }
+
+    if (event.getState() == KeeperState.SyncConnected) {
+      if (!_isNewSessionEventFired && !"0".equals(getHexSessionId())) {
+        /*
+         * Before the new zookeeper instance is connected to the zookeeper service and its session
+         * is established, its session id is 0.
+         * New session event is not fired until the new zookeeper session receives the first
+         * SyncConnected state(the zookeeper session is established).
+         * Now the session id is available and non-zero, and we can fire new session events.
+         */
+        fireNewSessionEvents();
+        /*
+         * Set it true to avoid firing events again for the same session next time
+         * when SyncConnected events are received.
+         */
+        _isNewSessionEventFired = true;
+
+        /*
+         * With this first SyncConnected state, we just get connected to zookeeper service after
+         * reconnecting when the session expired. Because previous session expired, we also have to
+         * notify all listeners that something might have changed.
+         */
+        fireAllEvents();
+      }
+    } else if (event.getState() == KeeperState.Expired) {
       reconnectOnExpiring();
     }
   }
@@ -850,7 +1113,6 @@ public class ZkClient implements Watcher {
     while (!isClosed()) {
       try {
         reconnect();
-        fireNewSessionEvents();
         return;
       } catch (ZkInterruptedException interrupt) {
         reconnectException = interrupt;
@@ -878,6 +1140,7 @@ public class ZkClient implements Watcher {
     try {
       ZkConnection connection = ((ZkConnection) getConnection());
       connection.reconnect(this);
+      _isNewSessionEventFired = false;
     } catch (InterruptedException e) {
       throw new ZkInterruptedException(e);
     } finally {
@@ -886,19 +1149,20 @@ public class ZkClient implements Watcher {
   }
 
   private void fireNewSessionEvents() {
+    final String sessionId = getHexSessionId();
     for (final IZkStateListener stateListener : _stateListener) {
-      _eventThread.send(new ZkEvent("New session event sent to " + stateListener) {
+      _eventThread.send(new ZkEvent("New session event sent to " + stateListener, sessionId) {
 
         @Override
         public void run() throws Exception {
-          stateListener.handleNewSession(null);
+          stateListener.handleNewSession(sessionId);
         }
       });
     }
   }
 
   protected void fireStateChangedEvent(final KeeperState state) {
-    final String sessionId = Long.toHexString(getSessionId());
+    final String sessionId = getHexSessionId();
     for (final IZkStateListener stateListener : _stateListener) {
       final String description = "State changed to " + state + " sent to " + stateListener;
       _eventThread.send(new ZkEvent(description, sessionId) {
@@ -1166,6 +1430,12 @@ public class ZkClient implements Watcher {
     }
     try {
       while (true) {
+        // Because ConnectionLossException and SessionExpiredException are caught but not thrown,
+        // we don't know what causes retry. This is used to record which one of the two exceptions
+        // causes retry in ZkTimeoutException.
+        // This also helps the test testConnectionLossWhileCreateEphemeral.
+        KeeperException.Code retryCauseCode;
+
         if (isClosed()) {
           throw new IllegalStateException("ZkClient already closed!");
         }
@@ -1178,13 +1448,17 @@ public class ZkClient implements Watcher {
           }
           return callable.call();
         } catch (ConnectionLossException e) {
+          retryCauseCode = e.code();
           // we give the event thread some time to update the status to 'Disconnected'
           Thread.yield();
           waitForRetry();
         } catch (SessionExpiredException e) {
+          retryCauseCode = e.code();
           // we give the event thread some time to update the status to 'Expired'
           Thread.yield();
           waitForRetry();
+        } catch (ZkSessionMismatchedException e) {
+          throw e;
         } catch (KeeperException e) {
           throw ZkException.create(e);
         } catch (InterruptedException e) {
@@ -1195,7 +1469,8 @@ public class ZkClient implements Watcher {
         // before attempting a retry, check whether retry timeout has elapsed
         if (System.currentTimeMillis() - operationStartTime > _operationRetryTimeoutInMillis) {
           throw new ZkTimeoutException("Operation cannot be retried because of retry timeout ("
-              + _operationRetryTimeoutInMillis + " milli seconds)");
+              + _operationRetryTimeoutInMillis + " milli seconds). Retry was caused by "
+              + retryCauseCode);
         }
       }
     } finally {
@@ -1757,6 +2032,24 @@ public class ZkClient implements Watcher {
     }
   }
 
+  /*
+   * Gets a session id in hexadecimal notation.
+   * Ex. 1000a5ceb930004 is returned.
+   */
+  private String getHexSessionId() {
+    return ZKUtil.toHexSessionId(getSessionId());
+  }
+
+  /*
+   * Session aware operation needs below requirements:
+   * 1. the session id is NOT null or empty
+   * 2. create mode is EPHEMERAL or EPHEMERAL_SEQUENTIAL
+   */
+  private boolean isSessionAwareOperation(String expectedSessionId, CreateMode mode) {
+    return expectedSessionId != null && !expectedSessionId.isEmpty()
+        && (mode == CreateMode.EPHEMERAL || mode == CreateMode.EPHEMERAL_SEQUENTIAL);
+  }
+
   // operations to update monitor's counters
   private void record(String path, byte[] data, long startTimeMilliSec,
       ZkClientMonitor.AccessType accessType) {
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index 8a5b7fe..5b99fa4 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -202,7 +202,7 @@ public class ZkTestHelper {
 
     String newSessionId = Long.toHexString(curZookeeper.getSessionId());
     LOG.info("After session expiry. sessionId: " + newSessionId + ", zk: " + curZookeeper);
-    Assert.assertNotSame(newSessionId, oldSessionId, "Fail to expire current session, zk: "
+    Assert.assertFalse(newSessionId.equals(oldSessionId), "Fail to expire current session, zk: "
         + curZookeeper);
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
index ef98be9..8cae2f7 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
@@ -22,7 +22,9 @@ package org.apache.helix.manager.zk;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.HelixException;
 import org.apache.helix.InstanceType;
@@ -32,7 +34,7 @@ import org.apache.helix.ZkTestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.zookeeper.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.LiveInstance;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -98,8 +100,9 @@ public class TestHandleNewSession extends ZkTestBase {
 
     // Create controller leader
     final String controllerName = "controller_0";
-    final BlockingZkHelixManager manager =
-        new BlockingZkHelixManager(clusterName, controllerName, InstanceType.CONTROLLER, ZK_ADDR);
+    final BlockingHandleNewSessionZkHelixManager manager =
+        new BlockingHandleNewSessionZkHelixManager(clusterName, controllerName,
+            InstanceType.CONTROLLER, ZK_ADDR);
     GenericHelixController controller0 = new GenericHelixController();
     DistributedLeaderElection election =
         new DistributedLeaderElection(manager, controller0, Collections.EMPTY_LIST);
@@ -172,10 +175,303 @@ public class TestHandleNewSession extends ZkTestBase {
     TestHelper.dropCluster(clusterName, _gZkClient);
   }
 
-  class BlockingZkHelixManager extends ZKHelixManager {
+  /*
+   * Tests session expiry before calling ZkHelixManager.handleNewSession(sessionId).
+   * This test checks to see if the expired sessions would be discarded and the operation would
+   * be returned in handleNewSession. The live instance is only created by the latest session.
+   * This test does not handle new sessions until creating 2 expired session events, which simulates
+   * a long backlog in the event queue. At that time, the first new session is already expired and
+   * should be discarded. The live instance is only created by the second new session.
+   * Set test timeout to 5 minutes, just in case zk server is dead and the test is hung.
+   */
+  @Test(timeOut = 5 * 60 * 1000L)
+  public void testDiscardExpiredSessions() throws Exception {
+    final String className = TestHelper.getTestClassName();
+    final String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    final ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(ZK_ADDR));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR,
+        12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave",
+        true); // do rebalance
+
+    final String instanceName = "localhost_12918";
+    final BlockingHandleNewSessionZkHelixManager manager =
+        new BlockingHandleNewSessionZkHelixManager(clusterName, instanceName,
+            InstanceType.PARTICIPANT, ZK_ADDR);
+
+    manager.connect();
+
+    final String originalSessionId = manager.getSessionId();
+    final LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+    final long originalLiveInstanceCreationTime = liveInstance.getStat().getCreationTime();
+
+    // Verify current live instance.
+    Assert.assertNotNull(liveInstance);
+    Assert.assertEquals(liveInstance.getEphemeralOwner(), originalSessionId);
+
+    final int handlerCount = manager.getHandlers().size();
+    final long originalNewSessionStartTime = manager.getHandleNewSessionStartTime();
+
+    /*
+     * Create 2 expired session events. Followed by the expired sessions, there will be 2 new
+     * sessions(S1, S2) created: S0(original) expired -> S1 created -> S1 expired -> S2 created.
+     * Session S1 would not create a live instance. Instead, only S2 creates a live instance.
+     */
+    for (int i = 0; i < 2; i++) {
+      final String lastSessionId = ZKUtil.toHexSessionId(manager.getZkClient().getSessionId());
+      try {
+        // Lock zk event processing to simulate a long backlog queue.
+        ((ZkClient) manager.getZkClient()).getEventLock().lockInterruptibly();
+
+        // Async expire the session and create a new session.
+        ZkTestHelper.asyncExpireSession(manager.getZkClient());
+
+        // Wait and verify the zookeeper is alive.
+        Assert.assertTrue(TestHelper.verify(
+            () -> !((ZkClient) manager.getZkClient()).getConnection().getZookeeperState().isAlive(),
+            3000L));
+      } finally {
+        // Unlock to start processing event again.
+        ((ZkClient) manager.getZkClient()).getEventLock().unlock();
+      }
+
+      // Wait until the ZkClient has got a new session.
+      Assert.assertTrue(TestHelper.verify(() -> {
+        try {
+          final String sessionId = ZKUtil.toHexSessionId(manager.getZkClient().getSessionId());
+          return !"0".equals(sessionId) && !sessionId.equals(lastSessionId);
+        } catch (HelixException ex) {
+          return false;
+        }
+      }, 2000L));
+
+      // Ensure that the manager has not processed the new session event yet.
+      Assert.assertEquals(manager.getHandleNewSessionStartTime(), originalNewSessionStartTime);
+    }
+
+    // Start to handle all new sessions.
+    for (int i = 0; i < 2; i++) {
+      // The live instance is gone and should NOT be created by the expired session.
+      Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
+
+      final long lastEndTime = manager.getHandleNewSessionEndTime();
+
+      // Proceed the new session handling, so the manager will
+      // get the second new session and process it.
+      manager.proceedNewSessionHandling();
+
+      // Wait for handling new session to complete.
+      Assert.assertTrue(
+          TestHelper.verify(() -> manager.getHandleNewSessionEndTime() > lastEndTime, 2000L));
+    }
+
+    // From now on, the live instance is created.
+    // The latest(the final new one) session id that is valid.
+    final String latestSessionId = ZKUtil.toHexSessionId(manager.getZkClient().getSessionId());
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      // Newly created live instance should be created by the latest session
+      // and have a new creation time.
+      LiveInstance newLiveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+      return newLiveInstance != null
+          && newLiveInstance.getStat().getCreationTime() != originalLiveInstanceCreationTime
+          && newLiveInstance.getEphemeralOwner().equals(latestSessionId);
+    }, 2000L));
+
+    // All the callback handlers shall be recovered.
+    Assert.assertTrue(TestHelper.verify(() -> manager.getHandlers().size() == handlerCount, 1000L));
+    Assert.assertTrue(manager.getHandlers().stream().allMatch(CallbackHandler::isReady));
+
+    // Clean up.
+    manager.disconnect();
+    deleteCluster(clusterName);
+  }
+
+  /*
+   * This test simulates that long time cost in resetting handlers causes zk session expiry, and
+   * ephemeral node should not be created by this expired zk session.
+   * This test follows belows steps:
+   * 1. Original session S0 initialized
+   * 2. S0 expired, new session S1 created
+   * 3. S1 spends a long time resetting handlers
+   * 4. S1 expired, new session S2 created
+   * 5. S1 completes resetting handlers, live instance should not be created by the expired S1
+   * 6. S2 is valid and creates live instance.
+   */
+  @Test
+  public void testSessionExpiredWhenResetHandlers() throws Exception {
+    final String className = TestHelper.getTestClassName();
+    final String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    final ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(ZK_ADDR));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR,
+        12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave",
+        true); // do rebalance
+
+    // 1. Original session S0 initialized
+    final String instanceName = "localhost_12918";
+    final BlockingResetHandlersZkHelixManager manager =
+        new BlockingResetHandlersZkHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
+            ZK_ADDR);
+
+    manager.connect();
+
+    final String originalSessionId = manager.getSessionId();
+    final long initResetHandlersStartTime = manager.getResetHandlersStartTime();
+    final LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+
+    // Verify current live instance.
+    Assert.assertNotNull(liveInstance);
+    Assert.assertEquals(liveInstance.getEphemeralOwner(), originalSessionId);
+
+    final int handlerCount = manager.getHandlers().size();
+    final long originalCreationTime = liveInstance.getStat().getCreationTime();
+    final CountDownLatch mainThreadBlocker = new CountDownLatch(1);
+    final CountDownLatch helperThreadBlocker = new CountDownLatch(1);
+
+    // Helper thread to help verify zk session states, async expire S1, proceed S1 to reset
+    // handlers and release main thread to verify results.
+    new Thread(() -> {
+      try {
+        // Wait for new session S1 is established and starting to reset handlers.
+        TestHelper.verify(() -> !(manager.getSessionId().equals(originalSessionId))
+            && manager.getResetHandlersStartTime() > initResetHandlersStartTime, 3000L);
+
+        // S1's info.
+        final String lastSessionId = manager.getSessionId();
+        final long lastResetHandlersStartTime = manager.getResetHandlersStartTime();
+
+        ((ZkClient) manager.getZkClient()).getEventLock().lockInterruptibly();
+        try {
+          // 4. S1 expired, new session S2 created
+          ZkTestHelper.asyncExpireSession(manager.getZkClient());
+
+          // Wait and verify the new session S2 is established.
+          TestHelper.verify(() -> !((ZKUtil.toHexSessionId(manager.getZkClient().getSessionId()))
+              .equals(lastSessionId)), 3000L);
+        } catch (Exception ignored) {
+          // Ignored.
+        } finally {
+          // Unlock to start processing event again.
+          ((ZkClient) manager.getZkClient()).getEventLock().unlock();
+        }
+
+        // Proceed S1 to complete reset handlers and try to create live instance.
+        manager.proceedResetHandlers();
+
+        // Wait for S2 to handle new session.
+        TestHelper.verify(() -> !(manager.getSessionId().equals(lastSessionId))
+            && manager.getResetHandlersStartTime() > lastResetHandlersStartTime, 3000L);
+
+        // Notify main thread to verify result: expired S1 should not create live instance.
+        mainThreadBlocker.countDown();
+
+        // Wait for notification from main thread to proceed S2.
+        helperThreadBlocker.await();
+
+        // Proceed S2.
+        // 6. S2 is valid and creates live instance.
+        manager.proceedResetHandlers();
+
+        final String latestSessionId = ZKUtil.toHexSessionId(manager.getZkClient().getSessionId());
+
+        TestHelper.verify(() -> {
+          // Newly created live instance should be created by the latest session
+          // and have a new creation time.
+          LiveInstance newLiveInstance =
+              accessor.getProperty(keyBuilder.liveInstance(instanceName));
+          return newLiveInstance != null
+              && newLiveInstance.getStat().getCreationTime() != originalCreationTime
+              && newLiveInstance.getEphemeralOwner().equals(latestSessionId);
+        }, 2000L);
+      } catch (Exception ignored) {
+        // Ignored.
+      }
+
+      // Notify the main thread that live instance is already created by session S2.
+      mainThreadBlocker.countDown();
+
+    }).start();
+
+    // Lock zk event processing to simulate a long backlog queue.
+    ((ZkClient) manager.getZkClient()).getEventLock().lockInterruptibly();
+    try {
+      // 2. S0 expired, new session S1 created
+      ZkTestHelper.asyncExpireSession(manager.getZkClient());
+      // 3. S1 spends a long time resetting handlers during this period.
+
+      // Wait and verify the zookeeper is closed.
+      Assert.assertTrue(TestHelper.verify(
+          () -> !((ZkClient) manager.getZkClient()).getConnection().getZookeeperState().isAlive(),
+          3000L));
+    } finally {
+      // Unlock to start processing event again.
+      ((ZkClient) manager.getZkClient()).getEventLock().unlock();
+    }
+
+    // Wait for S1 completing resetting handlers.
+    mainThreadBlocker.await();
+
+    // 5. S1 completes resetting handlers, live instance should not be created by the expired S1
+    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
+
+    // Notify helper thread to proceed S2.
+    helperThreadBlocker.countDown();
+
+    // Wait for live instance being created by the new session S2.
+    mainThreadBlocker.await();
+
+    // From now on, the live instance is already created by S2.
+    // The latest(the final new one S2) session id that is valid.
+    final String latestSessionId = ZKUtil.toHexSessionId(manager.getZkClient().getSessionId());
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      // Newly created live instance should be created by the latest session
+      // and have a new creation time.
+      LiveInstance newLiveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+      return newLiveInstance != null
+          && newLiveInstance.getStat().getCreationTime() != originalCreationTime
+          && newLiveInstance.getEphemeralOwner().equals(latestSessionId);
+    }, 2000L));
+
+    // All the callback handlers shall be recovered.
+    Assert.assertTrue(TestHelper.verify(() -> manager.getHandlers().size() == handlerCount, 1000L));
+    Assert.assertTrue(TestHelper.verify(
+        () -> manager.getHandlers().stream().allMatch(CallbackHandler::isReady), 3000L));
+
+    // Clean up.
+    manager.disconnect();
+    deleteCluster(clusterName);
+  }
+
+  static class BlockingHandleNewSessionZkHelixManager extends ZKHelixManager {
     private final Semaphore newSessionHandlingCount = new Semaphore(1);
+    private long handleNewSessionStartTime = 0L;
+    private long handleNewSessionEndTime = 0L;
 
-    public BlockingZkHelixManager(String clusterName, String instanceName,
+    public BlockingHandleNewSessionZkHelixManager(String clusterName, String instanceName,
         InstanceType instanceType, String zkAddress) {
       super(clusterName, instanceName, instanceType, zkAddress);
     }
@@ -183,7 +479,9 @@ public class TestHandleNewSession extends ZkTestBase {
     @Override
     public void handleNewSession(final String sessionId) throws Exception {
       newSessionHandlingCount.acquire();
+      handleNewSessionStartTime = System.currentTimeMillis();
       super.handleNewSession(sessionId);
+      handleNewSessionEndTime = System.currentTimeMillis();
     }
 
     void proceedNewSessionHandling() {
@@ -193,5 +491,59 @@ public class TestHandleNewSession extends ZkTestBase {
     List<CallbackHandler> getHandlers() {
       return _handlers;
     }
+
+    HelixZkClient getZkClient() {
+      return _zkclient;
+    }
+
+    long getHandleNewSessionStartTime() {
+      return handleNewSessionStartTime;
+    }
+
+    long getHandleNewSessionEndTime() {
+      return handleNewSessionEndTime;
+    }
+  }
+
+  /*
+   * A ZkHelixManager that simulates long time cost in resetting handlers.
+   */
+  static class BlockingResetHandlersZkHelixManager extends ZKHelixManager {
+    private final Semaphore resetHandlersSemaphore = new Semaphore(1);
+    private long resetHandlersStartTime = 0L;
+
+    public BlockingResetHandlersZkHelixManager(String clusterName, String instanceName,
+        InstanceType instanceType, String zkAddress) {
+      super(clusterName, instanceName, instanceType, zkAddress);
+    }
+
+    @Override
+    void resetHandlers(boolean isShutdown) {
+      resetHandlersStartTime = System.currentTimeMillis();
+      try {
+        if (!isShutdown) {
+          resetHandlersSemaphore.tryAcquire(20L, TimeUnit.SECONDS);
+        }
+      } catch (InterruptedException ignored) {
+        // Ignore the exception.
+      }
+      super.resetHandlers(isShutdown);
+    }
+
+    void proceedResetHandlers() {
+      resetHandlersSemaphore.release();
+    }
+
+    List<CallbackHandler> getHandlers() {
+      return _handlers;
+    }
+
+    HelixZkClient getZkClient() {
+      return _zkclient;
+    }
+
+    long getResetHandlersStartTime() {
+      return resetHandlersStartTime;
+    }
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index 156f59f..fd7bb3c 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -22,9 +22,11 @@ package org.apache.helix.manager.zk;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -33,6 +35,8 @@ import javax.management.ObjectName;
 
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkServer;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.helix.HelixException;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.ZkUnitTestBase;
@@ -43,6 +47,7 @@ import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
 import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -54,6 +59,7 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+
 public class TestRawZkClient extends ZkUnitTestBase {
   private final String TEST_TAG = "test_monitor";
   private final String TEST_ROOT = "/my_cluster/IDEALSTATES";
@@ -144,8 +150,8 @@ public class TestRawZkClient extends ZkUnitTestBase {
     for (int i = 0; i < 3; i++) {
       ZkTestHelper.expireSession(_zkClient);
       long newSessionId = _zkClient.getSessionId();
-      Assert.assertTrue(newSessionId > lastSessionId,
-          "New session id should be greater than expired session id.");
+      Assert.assertTrue(newSessionId != lastSessionId,
+          "New session id should not equal to expired session id.");
       lastSessionId = newSessionId;
     }
   }
@@ -461,4 +467,249 @@ public class TestRawZkClient extends ZkUnitTestBase {
       zkServer.shutdown();
     }
   }
+
+  /*
+   * This test checks that a valid session can successfully create an ephemeral node.
+   */
+  @Test
+  public void testCreateEphemeralWithValidSession() throws Exception {
+    final String className = TestHelper.getTestClassName();
+    final String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR,
+        12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave",
+        true); // do rebalance
+
+    final String originalSessionId = ZKUtil.toHexSessionId(_zkClient.getSessionId());
+    final String path = "/" + methodName;
+    final String data = "Hello Helix";
+
+    // Verify the node is not existed yet.
+    Assert.assertFalse(_zkClient.exists(path));
+
+    // Wait until the ZkClient has got a new session.
+    Assert.assertTrue(TestHelper
+        .verify(() -> _zkClient.getConnection().getZookeeperState().isConnected(), 1000L));
+
+    try {
+      // Create ephemeral node.
+      _zkClient.createEphemeral(path, data, originalSessionId);
+    } catch (Exception ex) {
+      Assert.fail("Failed to create ephemeral node.", ex);
+    }
+
+    // Verify the node is created and its data is correct.
+    Stat stat = new Stat();
+    String nodeData = _zkClient.readData(path, stat, true);
+
+    Assert.assertNotNull(nodeData, "Failed to create ephemeral node: " + path);
+    Assert.assertEquals(nodeData, data, "Data is not correct.");
+    Assert.assertTrue(stat.getEphemeralOwner() != 0L,
+        "Ephemeral owner should NOT be zero because the node is an ephemeral node.");
+    Assert.assertEquals(ZKUtil.toHexSessionId(stat.getEphemeralOwner()), originalSessionId,
+        "Ephemeral node is created by an unexpected session");
+
+    // Delete the node to clean up, otherwise, the ephemeral node would be existed
+    // until the session of its owner expires.
+    _zkClient.delete(path);
+  }
+
+  /*
+   * This test checks that ephemeral creation fails because the expected zk session does not match
+   * the actual zk session.
+   * How this test does is:
+   * 1. Creates a zk client and gets its original session id
+   * 2. Expires the original session, and a new session will be created
+   * 3. Tries to create ephemeral node with the original session
+   * 4. ZkSessionMismatchedException is expected for the creation and ephemeral node is not created
+   */
+  @Test
+  public void testCreateEphemeralWithMismatchedSession() throws Exception {
+    final String className = TestHelper.getTestClassName();
+    final String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR,
+        12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave",
+        true); // do rebalance
+
+    final long originalSessionId = _zkClient.getSessionId();
+    final String originalHexSessionId = ZKUtil.toHexSessionId(originalSessionId);
+    final String path = "/" + methodName;
+
+    // Verify the node is not existed.
+    Assert.assertFalse(_zkClient.exists(path));
+
+    // Expire the original session.
+    ZkTestHelper.expireSession(_zkClient);
+
+    // Wait until the ZkClient has got a new session.
+    Assert.assertTrue(TestHelper.verify(() -> {
+      try {
+        // New session id should not equal to expired session id.
+        return _zkClient.getSessionId() != originalSessionId;
+      } catch (HelixException ex) {
+        return false;
+      }
+    }, 1000L));
+
+    try {
+      // Try to create ephemeral node with the original session.
+      // This creation should NOT be successful because the original session is already expired.
+      _zkClient.createEphemeral(path, "Hello Helix", originalHexSessionId);
+      Assert.fail("Ephemeral node should not be created by the expired session.");
+    } catch (ZkSessionMismatchedException expected) {
+      // Expected because there is a session mismatch.
+    } catch (Exception unexpected) {
+      Assert.fail("Should not have thrown exception: " + unexpected);
+    }
+
+    // Verify the node is not created.
+    Assert.assertFalse(_zkClient.exists(path));
+  }
+
+  /*
+   * Tests that when trying to create an ephemeral node, if connection to zk service is lost,
+   * ConnectionLossException will be thrown in retryUntilConnected and ephemeral creation will fail.
+   * Because ConnectionLossException is not thrown in createEphemeral(), operation retry timeout is
+   * set to 3 seconds and then ZkTimeoutException is thrown. And retry cause message is checked to
+   * see if ConnectionLossException was thrown before retry.
+   */
+  @Test(timeOut = 5 * 60 * 1000L)
+  public void testConnectionLossWhileCreateEphemeral() throws Exception {
+    final String methodName = TestHelper.getTestMethodName();
+
+    final ZkClient zkClient = new ZkClient.Builder()
+        .setZkServer(ZK_ADDR)
+        .setOperationRetryTimeout(3000L) // 3 seconds
+        .build();
+
+    final String expectedSessionId = ZKUtil.toHexSessionId(zkClient.getSessionId());
+    final String path = "/" + methodName;
+    final String data = "data";
+
+    Assert.assertFalse(zkClient.exists(path));
+
+    // Shutdown zk server so zk operations will fail due to disconnection.
+    TestHelper.stopZkServer(_zkServer);
+
+    try {
+      final CountDownLatch countDownLatch = new CountDownLatch(1);
+      final AtomicBoolean running = new AtomicBoolean(true);
+
+      final Thread creationThread = new Thread(() -> {
+        while (running.get()) {
+          try {
+            // Create ephemeral node in the expected session id.
+            zkClient.createEphemeral(path, data, expectedSessionId);
+          } catch (ZkTimeoutException e) {
+            // Verify ConnectionLossException was thrown before retry and timeout.
+            if (e.getMessage().endsWith("Retry was caused by "
+                + KeeperException.ConnectionLossException.Code.CONNECTIONLOSS)) {
+              running.set(false);
+            }
+          }
+        }
+        countDownLatch.countDown();
+      });
+
+      creationThread.start();
+
+      final boolean creationThreadTerminated = countDownLatch.await(10L, TimeUnit.SECONDS);
+      if (!creationThreadTerminated) {
+        running.set(false);
+        creationThread.join(5000L);
+        Assert.fail("Failed to receive a ConnectionLossException after zookeeper has shutdown.");
+      }
+    } finally {
+      zkClient.close();
+      // Recover zk server.
+      _zkServer.start();
+    }
+  }
+
+  /*
+   * Tests that when trying to create an ephemeral node, if connection to zk service is lost,
+   * the creation operation will keep retrying until connected and finally successfully create the
+   * node.
+   * How this test simulates the steps is:
+   * 1. Shuts down zk server
+   * 2. Starts a creation thread to create an ephemeral node in the original zk session
+   * 3. Creation operation loses connection and keeps retrying
+   * 4. Restarts zk server and Zk session is recovered
+   * 5. zk client reconnects successfully and creates an ephemeral node
+   */
+  @Test(timeOut = 5 * 60 * 1000L)
+  public void testRetryUntilConnectedAfterConnectionLoss() throws Exception {
+    final String methodName = TestHelper.getTestMethodName();
+
+    final String expectedSessionId = ZKUtil.toHexSessionId(_zkClient.getSessionId());
+    final String path = "/" + methodName;
+    final String data = "data";
+
+    Assert.assertFalse(_zkClient.exists(path));
+
+    // Shutdown zk server so zk operations will fail due to disconnection.
+    TestHelper.stopZkServer(_zkServer);
+
+    final CountDownLatch countDownLatch = new CountDownLatch(1);
+    final AtomicBoolean running = new AtomicBoolean(true);
+
+    final Thread creationThread = new Thread(() -> {
+      while (running.get()) {
+        // Create ephemeral node in the expected session id.
+        System.out.println("Trying to create ephemeral node...");
+        _zkClient.createEphemeral(path, data, expectedSessionId);
+        System.out.println("Ephemeral node created.");
+        running.set(false);
+      }
+      countDownLatch.countDown();
+    });
+
+    creationThread.start();
+    // Keep creation thread retrying to connect for 10 seconds.
+    System.out.println("Keep creation thread retrying to connect for 10 seconds...");
+    TimeUnit.SECONDS.sleep(10);
+
+    System.out.println("Restarting zk server...");
+    _zkServer.start();
+
+    // Wait for creating ephemeral node successfully.
+    final boolean creationThreadTerminated = countDownLatch.await(10, TimeUnit.SECONDS);
+    if (!creationThreadTerminated) {
+      running.set(false);
+      creationThread.join(5000L);
+      Assert.fail("Failed to reconnect to zk server and create ephemeral node"
+          + " after zk server is recovered.");
+    }
+
+    Stat stat = new Stat();
+    String nodeData = _zkClient.readData(path, stat, true);
+
+    Assert.assertNotNull(nodeData, "Failed to create ephemeral node: " + path);
+    Assert.assertEquals(nodeData, data, "Data is not correct.");
+    Assert.assertTrue(stat.getEphemeralOwner() != 0L,
+        "Ephemeral owner should NOT be zero because the node is an ephemeral node.");
+    Assert.assertEquals(ZKUtil.toHexSessionId(stat.getEphemeralOwner()), expectedSessionId,
+        "Ephemeral node is created by an unexpected session");
+
+    // Delete the node to clean up, otherwise, the ephemeral node would be existed until the session
+    // of its owner expires.
+    _zkClient.delete(path);
+  }
 }