You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/07/18 16:42:01 UTC

[helix] branch metaclient updated: Metaclient leader election - basics (#2558)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new d2023a706 Metaclient leader election - basics (#2558)
d2023a706 is described below

commit d2023a706a63e82409ad1fff34e4feab89e15486
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Tue Jul 18 09:41:55 2023 -0700

    Metaclient leader election - basics (#2558)
    
    
    
    Co-authored-by: Xiaoyuan Lu <xi...@xialu-mn2.linkedin.biz>
---
 .../org/apache/helix/metaclient/api/OpResult.java  |   2 +-
 .../helix/metaclient/datamodel/DataRecord.java     |   5 +
 .../helix/metaclient/impl/zk/ZkMetaClient.java     |  50 +++--
 .../metaclient/impl/zk/util/ZkMetaClientUtil.java  |   2 +-
 .../leaderelection/LeaderElectionClient.java       | 204 ++++++++++++++++++---
 .../recipes/leaderelection/LeaderInfo.java         |  34 +++-
 .../leaderelection/LeaderInfoSerializer.java       |  42 +++++
 .../java/org/apache/helix/metaclient/TestUtil.java |  22 +++
 .../recipes/leaderelection/TestLeaderElection.java |  64 +++++++
 .../datamodel/serializer/ZNRecordSerializer.java   |   2 +-
 .../helix/zookeeper/zkclient/ZkConnection.java     |   1 +
 11 files changed, 366 insertions(+), 62 deletions(-)

diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/OpResult.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/OpResult.java
index effed8543..e3621190a 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/api/OpResult.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/OpResult.java
@@ -26,7 +26,7 @@ import java.util.List;
  */
 public class OpResult {
 
-  enum Type {
+  public enum Type {
     ERRORRESULT,
     GETDATARESULT,
     GETCHILDRENRESULT,
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java b/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java
index 3ed6928f1..91e5409ca 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java
@@ -20,6 +20,7 @@ package org.apache.helix.metaclient.datamodel;
  */
 
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.helix.metaclient.recipes.leaderelection.LeaderInfo;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
@@ -35,4 +36,8 @@ public class DataRecord extends ZNRecord {
   public DataRecord(ZNRecord record) {
     super(record);
   }
+
+  public DataRecord(DataRecord record, String id) {
+    super(record, id);
+  }
 }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index a1b6eb1ad..7f68ec9ca 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -83,17 +83,15 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
   // Lock all activities related to ZkClient connection
   private ReentrantLock _zkClientConnectionMutex = new ReentrantLock();
 
-
   public ZkMetaClient(ZkMetaClientConfig config) {
     _initConnectionTimeout = config.getConnectionInitTimeoutInMillis();
     _reconnectTimeout = config.getMetaClientReconnectPolicy().getAutoReconnectTimeout();
     // TODO: Right new ZkClient reconnect using exp backoff with fixed max backoff interval. We should
     // Allow user to config reconnect policy
-    _zkClient = new ZkClient(
-        new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()),
+    _zkClient = new ZkClient(new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()),
         (int) _initConnectionTimeout, _reconnectTimeout /*use reconnect timeout for retry timeout*/,
-        config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(),
-        config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false, true);
+        config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(), config.getMonitorInstanceName(),
+        config.getMonitorRootPathOnly(), false, true);
     _zkClientReconnectMonitor = Executors.newSingleThreadScheduledExecutor();
     _reconnectStateChangeListener = new ReconnectStateChangeListener();
   }
@@ -102,6 +100,8 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
   public void create(String key, Object data) {
     try {
       create(key, data, EntryMode.PERSISTENT);
+    } catch (ZkException e) {
+      throw ZkMetaClientUtil.translateZkExceptionToMetaclientException(e);
     } catch (Exception e) {
       throw new MetaClientException(e);
     }
@@ -110,16 +110,18 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
   @Override
   public void create(String key, Object data, MetaClientInterface.EntryMode mode) {
 
-    try{
+    try {
       _zkClient.create(key, data, ZkMetaClientUtil.convertMetaClientMode(mode));
-    } catch (ZkException | KeeperException e) {
+    } catch (ZkException e) {
+      throw ZkMetaClientUtil.translateZkExceptionToMetaclientException(e);
+    } catch (KeeperException e) {
       throw new MetaClientException(e);
     }
   }
 
   @Override
   public void createWithTTL(String key, T data, long ttl) {
-    try{
+    try {
       _zkClient.createPersistentWithTTL(key, data, ttl);
     } catch (ZkException e) {
       throw translateZkExceptionToMetaclientException(e);
@@ -178,7 +180,6 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
     return _zkClient.readData(key, true);
   }
 
-
   @Override
   public ImmutablePair<T, Stat> getDataAndStat(final String key) {
     try {
@@ -231,8 +232,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
   // corresponding callbacks for each operation are invoked in order.
   @Override
   public void setAsyncExecPoolSize(int poolSize) {
-    throw new UnsupportedOperationException(
-        "All async calls are executed in a single thread to maintain sequence.");
+    throw new UnsupportedOperationException("All async calls are executed in a single thread to maintain sequence.");
   }
 
   @Override
@@ -243,8 +243,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
     } catch (ZkException | KeeperException e) {
       throw new MetaClientException(e);
     }
-    _zkClient.asyncCreate(key, data, entryMode,
-          new ZkMetaClientCreateCallbackHandler(cb));
+    _zkClient.asyncCreate(key, data, entryMode, new ZkMetaClientCreateCallbackHandler(cb));
   }
 
   @Override
@@ -258,14 +257,12 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
 
   @Override
   public void asyncGet(String key, AsyncCallback.DataCallback cb) {
-    _zkClient.asyncGetData(key,
-        new ZkMetaClientGetCallbackHandler(cb));
+    _zkClient.asyncGetData(key, new ZkMetaClientGetCallbackHandler(cb));
   }
 
   @Override
   public void asyncCountChildren(String key, AsyncCallback.DataCallback cb) {
-    throw new NotImplementedException(
-        "Currently asyncCountChildren is not supported in ZkMetaClient.");
+    throw new NotImplementedException("Currently asyncCountChildren is not supported in ZkMetaClient.");
     /*
      * TODO:  Only Helix has potential using this API as of now. (ZkBaseDataAccessor.getChildren())
      *  Will move impl from ZkBaseDataAccessor to here when retiring ZkBaseDataAccessor.
@@ -275,8 +272,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
 
   @Override
   public void asyncExist(String key, AsyncCallback.StatCallback cb) {
-    _zkClient.asyncExists(key,
-        new ZkMetaClientExistCallbackHandler(cb));
+    _zkClient.asyncExists(key, new ZkMetaClientExistCallbackHandler(cb));
   }
 
   public void asyncDelete(String key, AsyncCallback.VoidCallback cb) {
@@ -285,16 +281,14 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
 
   @Override
   public void asyncTransaction(Iterable<Op> ops, AsyncCallback.TransactionCallback cb) {
-    throw new NotImplementedException(
-        "Currently asyncTransaction is not supported in ZkMetaClient.");
+    throw new NotImplementedException("Currently asyncTransaction is not supported in ZkMetaClient.");
 
-     //TODO: There is no active use case for Async transaction.
+    //TODO: There is no active use case for Async transaction.
   }
 
   @Override
   public void asyncSet(String key, T data, int version, AsyncCallback.StatCallback cb) {
-    _zkClient.asyncSetData(key, data, version,
-        new ZkMetaClientSetCallbackHandler(cb));
+    _zkClient.asyncSetData(key, data, version, new ZkMetaClientSetCallbackHandler(cb));
   }
 
   @Override
@@ -332,8 +326,8 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
   }
 
   @Override
-  public DirectChildSubscribeResult subscribeDirectChildChange(String key,
-      DirectChildChangeListener listener, boolean skipWatchingNonExistNode) {
+  public DirectChildSubscribeResult subscribeDirectChildChange(String key, DirectChildChangeListener listener,
+      boolean skipWatchingNonExistNode) {
     ChildrenSubscribeResult result =
         _zkClient.subscribeChildChanges(key, new DirectChildListenerAdapter(listener), skipWatchingNonExistNode);
     return new DirectChildSubscribeResult(result.getChildren(), result.isInstalled());
@@ -466,7 +460,6 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
         _reconnectMonitorFuture.cancel(true);
         LOG.info("ZkClient reconnect monitor thread is canceled");
       }
-
     } finally {
       _zkClientConnectionMutex.unlock();
     }
@@ -539,8 +532,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
                 cleanUpAndClose(false, true);
               }
             }, _reconnectTimeout, TimeUnit.MILLISECONDS);
-            LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after {}",
-                _reconnectTimeout);
+            LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after {}", _reconnectTimeout);
           }
         } finally {
           _zkClientConnectionMutex.unlock();
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
index dec8711cf..f93e98919 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
@@ -39,8 +39,8 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
-import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
index 4f609153b..d5bdb735a 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
@@ -19,10 +19,27 @@ package org.apache.helix.metaclient.recipes.leaderelection;
  * under the License.
  */
 
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
 import java.util.List;
 
+import java.util.Set;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.helix.metaclient.api.DataChangeListener;
 import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.api.Op;
+import org.apache.helix.metaclient.api.OpResult;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
+import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.metaclient.api.OpResult.Type.*;
 
 
 /**
@@ -42,7 +59,17 @@ import org.apache.helix.metaclient.factories.MetaClientConfig;
  * When the client is used by a leader election service, one client is created for each participant.
  *
  */
-public class LeaderElectionClient {
+public class LeaderElectionClient implements AutoCloseable {
+
+  private final MetaClientInterface<LeaderInfo> _metaClient;
+  private final String _participant;
+  private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionClient.class);
+
+  // A list of leader election group that this client joins.
+  private Set<String> _leaderGroups = new HashSet<>();
+
+  private final static String LEADER_ENTRY_KEY = "/LEADER";
+  ReElectListener _reElectListener = new ReElectListener();
 
   /**
    * Construct a LeaderElectionClient using a user passed in leaderElectionConfig. It creates a MetaClient
@@ -53,25 +80,37 @@ public class LeaderElectionClient {
    * @param metaClientConfig The config used to create an metaclient.
    */
   public LeaderElectionClient(MetaClientConfig metaClientConfig, String participant) {
-
+    _participant = participant;
+    if (metaClientConfig == null) {
+      throw new IllegalArgumentException("MetaClientConfig cannot be null.");
+    }
+    LOG.info("Creating MetaClient for LeaderElectionClient");
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(metaClientConfig.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(
+          metaClientConfig.getConnectionAddress()).setZkSerializer((new LeaderInfoSerializer())).build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + metaClientConfig.getStoreType());
+    }
   }
 
   /**
    * Construct a LeaderElectionClient using a user passed in MetaClient object
-   * When MetaClient is auto closed be cause of being disconnected and auto retry connection timed out, user
+   * When MetaClient is auto closed because of being disconnected and auto retry connection timed out, user
    * will need to create a new MetaClient and a new LeaderElectionClient instance.
    *
    * @param metaClient metaClient object to be used.
    */
-  public LeaderElectionClient(MetaClientInterface metaClient, String participant) {
-
+  public LeaderElectionClient(MetaClientInterface<LeaderInfo> metaClient, String participant) {
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
   /**
    * Returns true if current participant is the current leadership.
    */
   public boolean isLeader(String leaderPath) {
-    return false;
+    return getLeader(leaderPath).equalsIgnoreCase(_participant);
   }
 
   /**
@@ -79,10 +118,11 @@ public class LeaderElectionClient {
    * The Leader Election client maintains and elect an active leader from the participant pool.
    *
    * @param leaderPath The path for leader election.
-   * @return boolean indicating if the operation is succeeded.
+   * @throws RuntimeException if the operation is not succeeded.
    */
-  public boolean joinLeaderElectionParticipantPool(String leaderPath) {
-    return false;
+  public void joinLeaderElectionParticipantPool(String leaderPath) {
+    // TODO: create participant entry
+    subscribeAndTryCreateLeaderEntry(leaderPath);
   }
 
   /**
@@ -91,10 +131,46 @@ public class LeaderElectionClient {
    *
    * @param leaderPath The path for leader election.
    * @param userInfo Any additional information to associate with this participant.
-   * @return boolean indicating if the operation is succeeded.
+   * @throws RuntimeException if the operation is not succeeded.
    */
-  public boolean joinLeaderElectionParticipantPool(String leaderPath, Object userInfo) {
-    return false;
+  public void joinLeaderElectionParticipantPool(String leaderPath, LeaderInfo userInfo) {
+    // TODO: create participant entry with info
+    subscribeAndTryCreateLeaderEntry(leaderPath);
+  }
+
+  private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
+    _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener, false);
+    LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY);
+    leaderInfo.setLeaderName(_participant);
+
+    try {
+      // try to create leader entry, assuming leader election group node is already there
+      _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo, MetaClientInterface.EntryMode.EPHEMERAL);
+    } catch (MetaClientNodeExistsException ex) {
+      LOG.info("Already a leader for group {}", leaderPath);
+    } catch (MetaClientNoNodeException ex) {
+      try {
+        // try to create leader path root entry
+        _metaClient.create(leaderPath, null);
+      } catch (MetaClientNodeExistsException ignored) {
+        // root entry created by other client, ignore
+      } catch (MetaClientNoNodeException e) {
+        // Parent entry of user provided leader election group path missing.
+        // (e.g. `/a/b` not created in user specified leader election group path /a/b/c/LeaderGroup)
+        throw new MetaClientException("Parent entry in leaderGroup path" + leaderPath + " does not exist.");
+      }
+      try {
+        // try to create leader node again.
+        _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo, MetaClientInterface.EntryMode.EPHEMERAL);
+      } catch (MetaClientNoNodeException e) {
+        // Leader group root entry is gone after we checked at outer catch block.
+        // Meaning other client removed the group. Throw ConcurrentModificationException.
+        throw new ConcurrentModificationException(
+            "Other client trying to modify the leader election group at the same time, please retry.", ex);
+      }
+    }
+
+    _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY);
   }
 
   /**
@@ -106,23 +182,64 @@ public class LeaderElectionClient {
    * Throws exception if the participant is not in the pool.
    *
    * @param leaderPath The path for leader election.
-   * @return boolean indicating if the operation is succeeded.
+   * @throws RuntimeException if the operation is not succeeded.
    *
-   * @throws RuntimeException If the participant did not join participant pool via this client. // TODO: define exp type
+   * @throws RuntimeException If the participant did not join participant pool via this client.
    */
-  public boolean exitLeaderElectionParticipantPool(String leaderPath) {
-    return false;
+  public void exitLeaderElectionParticipantPool(String leaderPath) {
+    _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener);
+    // TODO: remove from pool folder
+    relinquishLeaderHelper(leaderPath, true);
   }
 
   /**
-   * Releases leadership for participant.
+   * Releases leadership for participant. Still stays in the participant pool.
    *
    * @param leaderPath The path for leader election.
    *
    * @throws RuntimeException if the leadership is not owned by this participant, or if the
-   *                          participant did not join participant pool via this client. // TODO: define exp type
+   *                          participant did not join participant pool via this client.
    */
   public void relinquishLeader(String leaderPath) {
+    relinquishLeaderHelper(leaderPath, false);
+  }
+
+  /**
+   * relinquishLeaderHelper and LeaderElectionParticipantPool if configured
+   * @param leaderPath
+   * @param exitLeaderElectionParticipantPool
+   */
+  private void relinquishLeaderHelper(String leaderPath, Boolean exitLeaderElectionParticipantPool) {
+    String key = leaderPath + LEADER_ENTRY_KEY;
+    // if current client is in the group
+    if (!_leaderGroups.contains(key)) {
+      throw new MetaClientException("Participant is not in the leader election group");
+    }
+    // remove leader path from leaderGroups after check if exiting the pool.
+    // to prevent a race condition in In Zk implementation:
+    // If there are delays in ZkClient event queue, it is possible the leader election client received leader
+    // deleted event after unsubscribeDataChange. We will need to remove it from in memory `leaderGroups` map before
+    // deleting ZNode. So that handler in ReElectListener won't recreate the leader node.
+    if (exitLeaderElectionParticipantPool) {
+      _leaderGroups.remove(leaderPath + LEADER_ENTRY_KEY);
+    }
+    // check if current participant is the leader
+    // read data and stats, check, and multi check + delete
+    ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = _metaClient.getDataAndStat(key);
+    if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
+      int expectedVersion = tup.right.getVersion();
+      List<Op> ops = Arrays.asList(Op.check(key, expectedVersion), Op.delete(key, expectedVersion));
+      //Execute transactional support on operations
+      List<OpResult> opResults = _metaClient.transactionOP(ops);
+      if (opResults.get(0).getType() == ERRORRESULT) {
+        if (isLeader(leaderPath)) {
+          // Participant re-elected as leader.
+          throw new ConcurrentModificationException("Concurrent operation, please retry");
+        } else {
+          LOG.info("Someone else is already leader");
+        }
+      }
+    }
   }
 
   /**
@@ -133,6 +250,12 @@ public class LeaderElectionClient {
    * @throws RuntimeException when leader path does not exist. // TODO: define exp type
    */
   public String getLeader(String leaderPath) {
+    LeaderInfo leaderInfo = _metaClient.get(leaderPath + LEADER_ENTRY_KEY);
+    return leaderInfo == null ? null : leaderInfo.getLeaderName();
+  }
+
+  public LeaderInfo getParticipantInfo(String leaderPath) {
+    // TODO: add getParticipantInfo impl
     return null;
   }
 
@@ -158,14 +281,14 @@ public class LeaderElectionClient {
    * get's auto-deleted after TTL or session timeout) or a new leader comes up, it notifies all
    * participants who have been listening on entryChange event.
    *
-   * An listener will still be installed if the path does not exists yet.
+   * A listener will still be installed if the path does not exist yet.
    *
    * @param leaderPath The path for leader election that listener is interested for change.
    * @param listener An implementation of LeaderElectionListenerInterface
-   * @return an boolean value indicating if registration is success.
+   * @return A boolean value indicating if registration is success.
    */
-  public boolean subscribeLeadershipChanges(String leaderPath,
-      LeaderElectionListenerInterface listener) {
+  public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) {
+    //TODO: add converter class for LeaderElectionListenerInterface
     return false;
   }
 
@@ -173,8 +296,41 @@ public class LeaderElectionClient {
    * @param leaderPath The path for leader election that listener is no longer interested for change.
    * @param listener An implementation of LeaderElectionListenerInterface
    */
-  public void unsubscribeLeadershipChanges(String leaderPath,
-      LeaderElectionListenerInterface listener) {
+  public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) {
+  }
+
+  @Override
+  public void close() throws Exception {
+
+    // exit all previous joined leader election groups
+    for (String leaderGroup : _leaderGroups) {
+      String leaderGroupPathName =
+          leaderGroup.substring(0, leaderGroup.length() - LEADER_ENTRY_KEY.length() /*remove '/LEADER' */);
+      exitLeaderElectionParticipantPool(leaderGroupPathName);
+    }
+
+    // TODO: if last participant, remove folder
+    _metaClient.disconnect();
+  }
+
+  class ReElectListener implements DataChangeListener {
+
+    @Override
+    public void handleDataChange(String key, Object data, ChangeType changeType) throws Exception {
+      if (changeType == ChangeType.ENTRY_CREATED) {
+        LOG.info("new leader {} for leader election group {}.", ((LeaderInfo) data).getLeaderName(), key);
+      } else if (changeType == ChangeType.ENTRY_DELETED) {
+        if (_leaderGroups.contains(key)) {
+          LeaderInfo lf = new LeaderInfo("LEADER");
+          lf.setLeaderName(_participant);
+          try {
+            _metaClient.create(key, lf, MetaClientInterface.EntryMode.EPHEMERAL);
+          } catch (MetaClientNodeExistsException ex) {
+            LOG.info("Already a leader {} for leader election group {}.", ((LeaderInfo) data).getLeaderName(), key);
+          }
+        }
+      }
+    }
   }
 }
 
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
index 9e817c1c9..ab0562502 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
@@ -19,27 +19,49 @@ package org.apache.helix.metaclient.recipes.leaderelection;
  * under the License.
  */
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.apache.helix.metaclient.datamodel.DataRecord;
 
 
 /**
  * This is the data represent leader election info of a leader election path.
  */
-public class LeaderInfo {
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class LeaderInfo extends DataRecord {
 
-  private String  _leaderElectionGroupName;
-  private final DataRecord _record;
+  public LeaderInfo(DataRecord dataRecord) {
+    super(dataRecord);
+  }
 
+  @JsonCreator
+  public LeaderInfo(@JsonProperty("id") String id) {
+    super(id);
+  }
 
-  public LeaderInfo( String leaderElectionGroupName) {
-    _leaderElectionGroupName = leaderElectionGroupName;
-    _record = new DataRecord(_leaderElectionGroupName);
+  public LeaderInfo(LeaderInfo info, String id) {
+    super(info, id);
   }
 
+
   public enum LeaderAttribute {
     LEADER_NAME,
     PARTICIPANTS
   }
 
+@JsonIgnore(true)
+public String getLeaderName() {
+    return getSimpleField("LEADER_NAME");
+  }
+
+  @JsonIgnore(true)
+  public void setLeaderName(String id) {
+     setSimpleField("LEADER_NAME", id);
+  }
+
 
 }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfoSerializer.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfoSerializer.java
new file mode 100644
index 000000000..fd6d256f6
--- /dev/null
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfoSerializer.java
@@ -0,0 +1,42 @@
+package org.apache.helix.metaclient.recipes.leaderelection;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import java.io.ByteArrayInputStream;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.util.GZipCompressionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LeaderInfoSerializer extends ZNRecordSerializer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LeaderInfoSerializer.class);
+
+  @Override
+  public Object deserialize(byte[] bytes) {
+    if (bytes == null || bytes.length == 0) {
+      // reading a parent/null node
+      return null;
+    }
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+
+    mapper.enable(MapperFeature.AUTO_DETECT_FIELDS);
+    mapper.enable(MapperFeature.AUTO_DETECT_SETTERS);
+    mapper.enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+    try {
+      //decompress the data if its already compressed
+      if (GZipCompressionUtil.isCompressed(bytes)) {
+        byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais);
+        bais = new ByteArrayInputStream(uncompressedBytes);
+      }
+
+      return mapper.readValue(bais, LeaderInfo.class);
+    } catch (Exception e) {
+      LOG.error("Exception during deserialization of bytes: {}", new String(bytes), e);
+      return null;
+    }
+  }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java b/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java
new file mode 100644
index 000000000..9b20c8e53
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java
@@ -0,0 +1,22 @@
+package org.apache.helix.metaclient;
+
+public class TestUtil {
+  public static final long WAIT_DURATION = 6 * 1000L;
+  public interface Verifier {
+    boolean verify()
+        throws Exception;
+  }
+
+  public static boolean verify(Verifier verifier, long timeout)
+      throws Exception {
+    long start = System.currentTimeMillis();
+    do {
+      boolean result = verifier.verify();
+      if (result || (System.currentTimeMillis() - start) > timeout) {
+        return result;
+      }
+      Thread.sleep(50);
+    } while (true);
+  }
+
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
new file mode 100644
index 000000000..3a45d97ce
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
@@ -0,0 +1,64 @@
+package org.apache.helix.metaclient.recipes.leaderelection;
+
+import org.apache.helix.metaclient.TestUtil;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestLeaderElection extends ZkMetaClientTestBase {
+
+  private static final String PARTICIPANT_NAME1 = "participant_1";
+  private static final String PARTICIPANT_NAME2 = "participant_2";
+  private static final String LEADER_PATH = "/LEADER_ELECTION_GROUP_1";
+
+  public LeaderElectionClient createLeaderElectionClient(String participantName) {
+    MetaClientConfig.StoreType storeType = MetaClientConfig.StoreType.ZOOKEEPER;
+    MetaClientConfig config = new MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR)
+        .setStoreType(storeType).build();
+    return new LeaderElectionClient(config, participantName);
+  }
+
+  @Test
+  public void testAcquireLeadership() throws Exception {
+    // create 2 clients representing 2 participants
+    LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+    LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+    clt1.joinLeaderElectionParticipantPool(LEADER_PATH);
+    clt2.joinLeaderElectionParticipantPool(LEADER_PATH);
+    // First client joining the leader election group should be current leader
+    Assert.assertTrue(TestUtil.verify(() -> {
+      return (clt1.getLeader(LEADER_PATH) != null);
+    }, TestUtil.WAIT_DURATION));
+    Assert.assertNotNull(clt1.getLeader(LEADER_PATH));
+    Assert.assertEquals(clt1.getLeader(LEADER_PATH), clt2.getLeader(LEADER_PATH));
+    Assert.assertEquals(clt1.getLeader(LEADER_PATH), PARTICIPANT_NAME1);
+
+    // client 1 exit leader election group, and client 2 should be current leader.
+    clt1.exitLeaderElectionParticipantPool(LEADER_PATH);
+    Assert.assertTrue(TestUtil.verify(() -> {
+      return (clt1.getLeader(LEADER_PATH) != null);
+    }, TestUtil.WAIT_DURATION));
+    Assert.assertTrue(TestUtil.verify(() -> {
+      return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME2));
+    }, TestUtil.WAIT_DURATION));
+
+    // client1 join and client2 leave. client 1 should be leader.
+    clt1.joinLeaderElectionParticipantPool(LEADER_PATH);
+    clt2.exitLeaderElectionParticipantPool(LEADER_PATH);
+    Assert.assertTrue(TestUtil.verify(() -> {
+      return (clt1.getLeader(LEADER_PATH) != null);
+    }, TestUtil.WAIT_DURATION));
+    Assert.assertTrue(TestUtil.verify(() -> {
+      return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME1));
+    }, TestUtil.WAIT_DURATION));
+    Assert.assertTrue(clt1.isLeader(LEADER_PATH));
+    Assert.assertFalse(clt2.isLeader(LEADER_PATH));
+
+    clt1.close();
+    clt2.close();
+  }
+
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
index 5838cfad7..0cc3215bb 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
 public class ZNRecordSerializer implements ZkSerializer {
   private static final Logger LOG = LoggerFactory.getLogger(ZNRecordSerializer.class);
 
-  private static ObjectMapper mapper = new ObjectMapper()
+  protected static ObjectMapper mapper = new ObjectMapper()
       // TODO: remove it after upgrading ZNRecord's annotations to Jackson 2
       .setAnnotationIntrospector(new CodehausJacksonIntrospector());
 
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
index 376409231..589425462 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
@@ -260,6 +260,7 @@ public class ZkConnection implements IZkConnection {
   private void lookupGetChildrenMethod() {
     _getChildrenMethod = doLookUpGetChildrenMethod();
 
+    System.out.println(" ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED " + GETCHILDREN_PAGINATION_DISABLED);
     LOG.info("Pagination config {}={}, method to be invoked: {}",
         ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED, GETCHILDREN_PAGINATION_DISABLED,
         _getChildrenMethod.getName());