You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/11/06 02:56:33 UTC

[2/3] git commit: [HELIX-259] add HelixConnection, rb=14728

[HELIX-259] add HelixConnection, rb=14728


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/6210b0d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/6210b0d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/6210b0d2

Branch: refs/heads/helix-logical-model
Commit: 6210b0d2d55adebe0f1f56a3ab27d76e7e001277
Parents: 842035a
Author: zzhang <zz...@apache.org>
Authored: Tue Nov 5 17:55:45 2013 -0800
Committer: zzhang <zz...@apache.org>
Committed: Tue Nov 5 17:55:45 2013 -0800

----------------------------------------------------------------------
 .../org/apache/helix/HelixAutoController.java   |  43 ++
 .../java/org/apache/helix/HelixConnection.java  | 253 ++++++++
 .../helix/HelixConnectionStateListener.java     |  13 +
 .../java/org/apache/helix/HelixController.java  |  18 +
 .../java/org/apache/helix/HelixParticipant.java |  37 ++
 .../main/java/org/apache/helix/HelixRole.java   |  40 ++
 .../java/org/apache/helix/HelixService.java     |  16 +
 .../helix/api/accessor/ClusterAccessor.java     |   2 +-
 .../manager/zk/HelixConnectionAdaptor.java      | 296 +++++++++
 .../helix/manager/zk/ZkBaseDataAccessor.java    |   7 -
 .../helix/manager/zk/ZkHelixAutoController.java | 114 ++++
 .../helix/manager/zk/ZkHelixConnection.java     | 605 +++++++++++++++++++
 .../helix/manager/zk/ZkHelixController.java     | 236 ++++++++
 .../helix/manager/zk/ZkHelixLeaderElection.java | 148 +++++
 .../helix/manager/zk/ZkHelixParticipant.java    | 475 +++++++++++++++
 .../apache/helix/monitoring/StatusDumpTask.java | 166 +++++
 .../participant/HelixStateMachineEngine.java    | 108 +++-
 .../helix/participant/StateMachineEngine.java   |  66 +-
 .../statemachine/HelixStateModelFactory.java    |  99 +++
 .../HelixStateModelFactoryAdaptor.java          |  17 +
 .../statemachine/StateModelFactory.java         |   4 +
 .../helix/integration/TestHelixConnection.java  | 151 +++++
 22 files changed, 2878 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAutoController.java b/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
new file mode 100644
index 0000000..7ad9218
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
@@ -0,0 +1,43 @@
+package org.apache.helix;
+
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.participant.StateMachineEngine;
+
+/**
+ * Autonomous controller
+ */
+public interface HelixAutoController extends HelixRole, HelixService, HelixConnectionStateListener {
+  /**
+   * get controller id
+   * @return controller id
+   */
+  ControllerId getControllerId();
+
+  /**
+   * get state machine engine
+   * @return state machine engine
+   */
+  StateMachineEngine getStateMachineEngine();
+
+  /**
+   * add pre-connect callback
+   * @param callback
+   */
+  void addPreConnectCallback(PreConnectCallback callback);
+
+  /**
+   * Add a LiveInstanceInfoProvider that is invoked before creating liveInstance.</br>
+   * This allows applications to provide additional information that will be published to zookeeper
+   * and become available for discovery</br>
+   * @see LiveInstanceInfoProvider#getAdditionalLiveInstanceInfo()
+   * @param liveInstanceInfoProvider
+   */
+  void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider);
+
+  /**
+   * tell if this controller is leader of cluster
+   * @return
+   */
+  boolean isLeader();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/HelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConnection.java b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
new file mode 100644
index 0000000..7551673
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
@@ -0,0 +1,253 @@
+package org.apache.helix;
+
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.accessor.ParticipantAccessor;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.store.HelixPropertyStore;
+
+/**
+ * Helix connection (aka helix manager)
+ */
+public interface HelixConnection {
+
+  /**
+   * start connection
+   */
+  void connect();
+
+  /**
+   * close connection
+   */
+  void disconnect();
+
+  /**
+   * test if connection is started
+   * @return true if connection is started, false otherwise
+   */
+  boolean isConnected();
+
+  /**
+   * get session id
+   * @return session id of current connection
+   */
+  SessionId getSessionId();
+
+  /**
+   * get session timeout
+   * @return session timeout in millisecond
+   */
+  int getSessionTimeout();
+
+  /**
+   * create a helix-participant
+   * @param clusterId
+   * @param participantId
+   * @return helix-participant
+   */
+  HelixParticipant createParticipant(ClusterId clusterId, ParticipantId participantId);
+
+  /**
+   * create a helix-controller
+   * @param clusterId
+   * @param controllerId
+   * @return helix-controller
+   */
+  HelixController createController(ClusterId clusterId, ControllerId controllerId);
+
+  /**
+   * create a cluster-accessor
+   * @param clusterId
+   * @return cluster-accessor
+   */
+  ClusterAccessor createClusterAccessor(ClusterId clusterId);
+
+  /**
+   * create a resource accessor
+   * @param clusterId
+   * @return resource accessor
+   */
+  ResourceAccessor createResourceAccessor(ClusterId clusterId);
+
+  /**
+   * create a participant accessor
+   * @param clusterId
+   * @return participant-accessor
+   */
+  ParticipantAccessor createParticipantAccessor(ClusterId clusterId);
+
+  /**
+   * Provides admin interface to setup and modify cluster
+   * @return instantiated HelixAdmin
+   */
+  HelixAdmin createClusterManagmentTool();
+
+  /**
+   * create a default property-store for a cluster
+   * @param clusterId
+   * @return property-store
+   */
+  HelixPropertyStore<ZNRecord> createPropertyStore(ClusterId clusterId);
+
+  /**
+   * create a data-accessor
+   * @param clusterId
+   * @return data-accessor
+   */
+  HelixDataAccessor createDataAccessor(ClusterId clusterId);
+
+  /**
+   * get config accessor
+   * TODO replace with new ConfigAccessor
+   * @return config accessor
+   */
+  @Deprecated
+  ConfigAccessor getConfigAccessor();
+
+  /**
+   * add ideal state change listener
+   * @param role
+   * @param listener
+   * @param clusterId
+   */
+  void addIdealStateChangeListener(HelixRole role, IdealStateChangeListener listener,
+      ClusterId clusterId);
+
+  /**
+   * add controller message listener
+   * @param role
+   * @param listener
+   * @param clusterId
+   */
+  void addControllerMessageListener(HelixRole role, MessageListener listener, ClusterId clusterId);
+
+  /**
+   * add controller listener
+   * @param role
+   * @param listener
+   * @param clusterId
+   */
+  void addControllerListener(HelixRole role, ControllerChangeListener listener, ClusterId clusterId);
+
+  /**
+   * add live-instance listener using this connection
+   * @param role
+   * @param listener
+   * @param clusterId
+   */
+  void addLiveInstanceChangeListener(HelixRole role, LiveInstanceChangeListener listener,
+      ClusterId clusterId);
+
+  /**
+   * add message listener
+   * @param role
+   * @param listener
+   * @param clusterId
+   * @param participantId
+   */
+  void addMessageListener(HelixRole role, MessageListener listener, ClusterId clusterId,
+      ParticipantId participantId);
+
+  /**
+   * add config change listener
+   * @param role
+   * @param listener
+   * @param clusterId
+   */
+  @Deprecated
+  void addConfigChangeListener(HelixRole role, ConfigChangeListener listener, ClusterId clusterId);
+
+  /**
+   * add instance config change listener
+   * @see InstanceConfigChangeListener#onInstanceConfigChange(List, NotificationContext)
+   * @param role
+   * @param listener
+   * @param clusterId
+   */
+  void addInstanceConfigChangeListener(HelixRole role, InstanceConfigChangeListener listener,
+      ClusterId clusterId);
+
+  /**
+   * add config change listener for a scope
+   * @see ScopedConfigChangeListener#onConfigChange(List, NotificationContext)
+   * @param role
+   * @param listener
+   * @param clusterId
+   * @param scope
+   */
+  void addConfigChangeListener(HelixRole role, ScopedConfigChangeListener listener,
+      ClusterId clusterId, ConfigScopeProperty scope);
+
+  /**
+   * add current state change listener
+   * @param role
+   * @param listener
+   * @param clusterId
+   * @param participantId
+   * @param sessionId
+   */
+  void addCurrentStateChangeListener(HelixRole role, CurrentStateChangeListener listener,
+      ClusterId clusterId, ParticipantId participantId, SessionId sessionId);
+
+  /**
+   * add health state change listener
+   * @see HealthStateChangeListener#onHealthChange(String, List, NotificationContext)
+   * @param listener
+   * @param instanceName
+   */
+  void addHealthStateChangeListener(HelixRole role, HealthStateChangeListener listener,
+      ClusterId clusterId, ParticipantId participantId);
+
+  /**
+   * add external view change listener
+   * @see ExternalViewChangeListener#onExternalViewChange(List, NotificationContext)
+   * @param listener
+   */
+  void addExternalViewChangeListener(HelixRole role, ExternalViewChangeListener listener,
+      ClusterId clusterId);
+
+  /**
+   * remove a listener
+   * @param role
+   * @param listener
+   * @param key
+   * @return
+   */
+  boolean removeListener(HelixRole role, Object listener, PropertyKey key);
+
+  /**
+   * add connection state listener
+   * @param listener
+   */
+  void addConnectionStateListener(HelixConnectionStateListener listener);
+
+  /**
+   * remove connection state listener
+   * @param listener
+   */
+  void removeConnectionStateListener(HelixConnectionStateListener listener);
+
+  /**
+   * create messasing service using this connection
+   * @param role
+   * @return messaging-service
+   */
+  ClusterMessagingService createMessagingService(HelixRole role);
+
+  /**
+   * get helix version
+   * @return helix version
+   */
+  String getHelixVersion();
+
+  /**
+   * get helix properties
+   * @return helix-properties
+   */
+  HelixManagerProperties getHelixProperties();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java b/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
new file mode 100644
index 0000000..13172d0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
@@ -0,0 +1,13 @@
+package org.apache.helix;
+
+public interface HelixConnectionStateListener {
+  /**
+   * called after connection is established
+   */
+  void onConnected();
+
+  /**
+   * called before disconnect
+   */
+  void onDisconnecting();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/HelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixController.java b/helix-core/src/main/java/org/apache/helix/HelixController.java
new file mode 100644
index 0000000..ce47e3d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixController.java
@@ -0,0 +1,18 @@
+package org.apache.helix;
+
+import org.apache.helix.api.id.ControllerId;
+
+public interface HelixController extends HelixRole, HelixService, HelixConnectionStateListener {
+
+  /**
+   * get controller id
+   * @return controller id
+   */
+  ControllerId getControllerId();
+
+  /**
+   * tell if this controller is leader of cluster
+   * @return
+   */
+  boolean isLeader();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixParticipant.java b/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
new file mode 100644
index 0000000..9002b15
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
@@ -0,0 +1,37 @@
+package org.apache.helix;
+
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.participant.StateMachineEngine;
+
+/**
+ * Helix participant
+ */
+public interface HelixParticipant extends HelixRole, HelixService, HelixConnectionStateListener {
+  /**
+   * get participant id
+   * @return participant id
+   */
+  ParticipantId getParticipantId();
+
+  /**
+   * get state machine engine
+   * @return state machine engine
+   */
+  StateMachineEngine getStateMachineEngine();
+
+  /**
+   * add pre-connect callback
+   * @param callback
+   */
+  void addPreConnectCallback(PreConnectCallback callback);
+
+  /**
+   * Add a LiveInstanceInfoProvider that is invoked before creating liveInstance.</br>
+   * This allows applications to provide additional information that will be published to zookeeper
+   * and become available for discovery</br>
+   * @see LiveInstanceInfoProvider#getAdditionalLiveInstanceInfo()
+   * @param liveInstanceInfoProvider
+   */
+  void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/HelixRole.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixRole.java b/helix-core/src/main/java/org/apache/helix/HelixRole.java
new file mode 100644
index 0000000..9e112d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixRole.java
@@ -0,0 +1,40 @@
+package org.apache.helix;
+
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.Id;
+
+/**
+ * helix-role i.e. participant, controller, auto-controller
+ */
+public interface HelixRole {
+  /**
+   * get the underlying connection
+   * @return helix-connection
+   */
+  HelixConnection getConnection();
+
+  /**
+   * get cluster id to which this role belongs
+   * @return cluster id
+   */
+  ClusterId getClusterId();
+
+  /**
+   * get id of this helix-role
+   * @return id
+   */
+  Id getId();
+
+  /**
+   * helix-role type
+   * @return
+   */
+  InstanceType getType();
+
+  /**
+   * get the messaging-service
+   * @return messaging-service
+   */
+  ClusterMessagingService getMessagingService();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/HelixService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixService.java b/helix-core/src/main/java/org/apache/helix/HelixService.java
new file mode 100644
index 0000000..33cc8e5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixService.java
@@ -0,0 +1,16 @@
+package org.apache.helix;
+
+/**
+ * Operational methods of a helix role
+ */
+public interface HelixService {
+  /**
+   * start helix service async
+   */
+  void startAsync();
+
+  /**
+   * stop helix service async
+   */
+  void stopAsync();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 85b8432..80977ab 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -69,10 +69,10 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
-import org.testng.internal.annotations.Sets;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 public class ClusterAccessor {
   private static Logger LOG = Logger.getLogger(ClusterAccessor.class);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
new file mode 100644
index 0000000..b58e4b2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
@@ -0,0 +1,296 @@
+package org.apache.helix.manager.zk;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixAutoController;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.HelixRole;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceConfigChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.LiveInstanceInfoProvider;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ScopedConfigChangeListener;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.Id;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.log4j.Logger;
+
+/**
+ * Adapt helix-connection to helix-manager, so we can pass to callback-handler and
+ * notification-context
+ */
+public class HelixConnectionAdaptor implements HelixManager {
+  private static Logger LOG = Logger.getLogger(HelixConnectionAdaptor.class);
+
+  final HelixRole _role;
+  final HelixConnection _connection;
+  final ClusterId _clusterId;
+  final Id _instanceId;
+  final InstanceType _instanceType;
+  final HelixDataAccessor _accessor;
+  final ClusterMessagingService _messagingService;
+  final SessionId _sessionId;
+
+  public HelixConnectionAdaptor(HelixRole role) {
+    _role = role;
+    _connection = role.getConnection();
+    _sessionId = _connection.getSessionId();
+    _clusterId = role.getClusterId();
+    _accessor = _connection.createDataAccessor(_clusterId);
+
+    _instanceId = role.getId();
+    _instanceType = role.getType();
+    _messagingService = role.getMessagingService();
+  }
+
+  @Override
+  public void connect() throws Exception {
+    _connection.connect();
+  }
+
+  @Override
+  public boolean isConnected() {
+    return _connection.isConnected();
+  }
+
+  @Override
+  public void disconnect() {
+    _connection.disconnect();
+  }
+
+  @Override
+  public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception {
+    _connection.addIdealStateChangeListener(_role, listener, _clusterId);
+  }
+
+  @Override
+  public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception {
+    _connection.addLiveInstanceChangeListener(_role, listener, _clusterId);
+  }
+
+  @Override
+  public void addConfigChangeListener(ConfigChangeListener listener) throws Exception {
+    _connection.addConfigChangeListener(_role, listener, _clusterId);
+  }
+
+  @Override
+  public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
+      throws Exception {
+    _connection.addInstanceConfigChangeListener(_role, listener, _clusterId);
+  }
+
+  @Override
+  public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
+      throws Exception {
+    _connection.addConfigChangeListener(_role, listener, _clusterId, scope);
+  }
+
+  @Override
+  public void addMessageListener(MessageListener listener, String instanceName) throws Exception {
+    _connection.addMessageListener(_role, listener, _clusterId, ParticipantId.from(instanceName));
+  }
+
+  @Override
+  public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
+      String instanceName, String sessionId) throws Exception {
+    _connection.addCurrentStateChangeListener(_role, listener, _clusterId,
+        ParticipantId.from(instanceName), SessionId.from(sessionId));
+  }
+
+  @Override
+  public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
+      throws Exception {
+    _connection.addHealthStateChangeListener(_role, listener, _clusterId,
+        ParticipantId.from(instanceName));
+  }
+
+  @Override
+  public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception {
+    _connection.addExternalViewChangeListener(_role, listener, _clusterId);
+  }
+
+  @Override
+  public void addControllerListener(ControllerChangeListener listener) {
+    _connection.addControllerListener(_role, listener, _clusterId);
+  }
+
+  @Override
+  public boolean removeListener(PropertyKey key, Object listener) {
+    return _connection.removeListener(_role, listener, key);
+  }
+
+  @Override
+  public HelixDataAccessor getHelixDataAccessor() {
+    return _accessor;
+  }
+
+  @Override
+  public ConfigAccessor getConfigAccessor() {
+    return _connection.getConfigAccessor();
+  }
+
+  @Override
+  public String getClusterName() {
+    return _clusterId.stringify();
+  }
+
+  @Override
+  public String getInstanceName() {
+    return _instanceId.stringify();
+  }
+
+  @Override
+  public String getSessionId() {
+    return _sessionId.stringify();
+  }
+
+  @Override
+  public long getLastNotificationTime() {
+    return 0;
+  }
+
+  @Override
+  public HelixAdmin getClusterManagmentTool() {
+    return _connection.createClusterManagmentTool();
+  }
+
+  @Override
+  public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
+    return (ZkHelixPropertyStore<ZNRecord>) _connection.createPropertyStore(_clusterId);
+  }
+
+  @Override
+  public ClusterMessagingService getMessagingService() {
+    return _messagingService;
+  }
+
+  @Override
+  public ParticipantHealthReportCollector getHealthReportCollector() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public InstanceType getInstanceType() {
+    return _instanceType;
+  }
+
+  @Override
+  public String getVersion() {
+    return _connection.getHelixVersion();
+  }
+
+  @Override
+  public HelixManagerProperties getProperties() {
+    return _connection.getHelixProperties();
+  }
+
+  @Override
+  public StateMachineEngine getStateMachineEngine() {
+    StateMachineEngine engine = null;
+    switch (_role.getType()) {
+    case PARTICIPANT:
+      HelixParticipant participant = (HelixParticipant) _role;
+      engine = participant.getStateMachineEngine();
+      break;
+    case CONTROLLER_PARTICIPANT:
+      HelixAutoController autoController = (HelixAutoController) _role;
+      engine = autoController.getStateMachineEngine();
+      break;
+    default:
+      LOG.info("helix manager type: " + _role.getType()
+          + " does NOT have state-machine-engine");
+      break;
+    }
+
+    return engine;
+  }
+
+  @Override
+  public boolean isLeader() {
+    boolean isLeader = false;
+    switch (_role.getType()) {
+    case CONTROLLER:
+      HelixController controller = (HelixController) _role;
+      isLeader = controller.isLeader();
+      break;
+    case CONTROLLER_PARTICIPANT:
+      HelixAutoController autoController = (HelixAutoController) _role;
+      isLeader = autoController.isLeader();
+      break;
+    default:
+      LOG.info("helix manager type: " + _role.getType() + " does NOT support leadership");
+      break;
+    }
+    return isLeader;
+  }
+
+  @Override
+  public void startTimerTasks() {
+    throw new UnsupportedOperationException(
+        "HelixConnectionAdaptor does NOT support start timer tasks");
+  }
+
+  @Override
+  public void stopTimerTasks() {
+    throw new UnsupportedOperationException(
+        "HelixConnectionAdaptor does NOT support stop timer tasks");
+  }
+
+  @Override
+  public void addPreConnectCallback(PreConnectCallback callback) {
+    switch (_role.getType()) {
+    case PARTICIPANT:
+      HelixParticipant participant = (HelixParticipant) _role;
+      participant.addPreConnectCallback(callback);
+      break;
+    case CONTROLLER_PARTICIPANT:
+      HelixAutoController autoController = (HelixAutoController) _role;
+      autoController.addPreConnectCallback(callback);
+      break;
+    default:
+      LOG.info("helix manager type: " + _role.getType()
+          + " does NOT support add pre-connect callback");
+      break;
+    }
+  }
+
+  @Override
+  public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
+    switch (_role.getType()) {
+    case PARTICIPANT:
+      HelixParticipant participant = (HelixParticipant) _role;
+      participant.setLiveInstanceInfoProvider(liveInstanceInfoProvider);
+      break;
+    case CONTROLLER_PARTICIPANT:
+      HelixAutoController autoController = (HelixAutoController) _role;
+      autoController.setLiveInstanceInfoProvider(liveInstanceInfoProvider);
+      break;
+    default:
+      LOG.info("helix manager type: " + _role.getType()
+          + " does NOT support set additional live instance information");
+      break;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 0b112cd..4c7798f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -710,13 +710,6 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       success[i] = (results.get(i)._retCode == RetCode.OK);
     }
 
-    for (int i = 0; i < paths.size(); i++) {
-      String path = paths.get(i);
-      T record = records.get(i);
-      if (path.indexOf("EXTERNALVIEW") != -1) {
-        System.out.println("path: " + path + ", record: " + record + ", success: " + success[i]);
-      }
-    }
     return success;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
new file mode 100644
index 0000000..d9ea445
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
@@ -0,0 +1,114 @@
+package org.apache.helix.manager.zk;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.HelixAutoController;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceInfoProvider;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.Id;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+public class ZkHelixAutoController implements HelixAutoController {
+  private static Logger LOG = Logger.getLogger(ZkHelixAutoController.class);
+
+  final ZkHelixConnection _connection;
+  final ClusterId _clusterId;
+  final ControllerId _controllerId;
+  final ZkHelixParticipant _participant;
+  final ZkHelixController _controller;
+
+  public ZkHelixAutoController(ZkHelixConnection connection, ClusterId clusterId,
+      ControllerId controllerId) {
+    _connection = connection;
+    _clusterId = clusterId;
+    _controllerId = controllerId;
+
+    _participant =
+        new ZkHelixParticipant(connection, clusterId, ParticipantId.from(controllerId.stringify()));
+    _controller = new ZkHelixController(connection, clusterId, controllerId);
+  }
+
+  @Override
+  public HelixConnection getConnection() {
+    return _connection;
+  }
+
+  @Override
+  public ClusterId getClusterId() {
+    return _clusterId;
+  }
+
+  @Override
+  public Id getId() {
+    return getControllerId();
+  }
+
+  @Override
+  public InstanceType getType() {
+    return InstanceType.CONTROLLER_PARTICIPANT;
+  }
+
+  @Override
+  public ClusterMessagingService getMessagingService() {
+    return _participant.getMessagingService();
+  }
+
+  @Override
+  public void startAsync() {
+    _connection.addConnectionStateListener(this);
+    onConnected();
+  }
+
+  @Override
+  public void stopAsync() {
+    _connection.removeConnectionStateListener(this);
+    onDisconnecting();
+  }
+
+  @Override
+  public void onConnected() {
+    _controller.reset();
+    _participant.reset();
+
+    _participant.init();
+    _controller.init();
+  }
+
+  @Override
+  public void onDisconnecting() {
+    LOG.info("disconnecting " + _controllerId + "(" + getType() + ") from " + _clusterId);
+    _controller.onDisconnecting();
+    _participant.onDisconnecting();
+  }
+
+  @Override
+  public ControllerId getControllerId() {
+    return _controllerId;
+  }
+
+  @Override
+  public StateMachineEngine getStateMachineEngine() {
+    return _participant.getStateMachineEngine();
+  }
+
+  @Override
+  public void addPreConnectCallback(PreConnectCallback callback) {
+    _participant.addPreConnectCallback(callback);
+  }
+
+  @Override
+  public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
+    _participant.setLiveInstanceInfoProvider(liveInstanceInfoProvider);
+  }
+
+  @Override
+  public boolean isLeader() {
+    return _controller.isLeader();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
new file mode 100644
index 0000000..0717d77
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
@@ -0,0 +1,605 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixConnectionStateListener;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.HelixRole;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceConfigChangeListener;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ScopedConfigChangeListener;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.accessor.ParticipantAccessor;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class ZkHelixConnection implements HelixConnection, IZkStateListener {
+  private static Logger LOG = Logger.getLogger(ZkHelixConnection.class);
+
+  final String _zkAddr;
+  final int _sessionTimeout;
+  SessionId _sessionId;
+  ZkClient _zkclient;
+  BaseDataAccessor<ZNRecord> _baseAccessor;
+  ConfigAccessor _configAccessor;
+  final Set<HelixConnectionStateListener> _connectionListener =
+      new CopyOnWriteArraySet<HelixConnectionStateListener>();
+
+  final Map<HelixRole, List<CallbackHandler>> _handlers;
+  final HelixManagerProperties _properties;
+
+  /**
+   * Keep track of timestamps that zk State has become Disconnected
+   * If in a _timeWindowLengthMs window zk State has become Disconnected
+   * for more than_maxDisconnectThreshold times disconnect the zkHelixManager
+   */
+  final List<Long> _disconnectTimeHistory = new ArrayList<Long>();
+  final int _flappingTimeWindowMs;
+  final int _maxDisconnectThreshold;
+
+  final ReentrantLock _lock = new ReentrantLock();
+
+  /**
+   * helix version#
+   */
+  final String _version;
+
+  public ZkHelixConnection(String zkAddr) {
+    _zkAddr = zkAddr;
+    _handlers = new HashMap<HelixRole, List<CallbackHandler>>();
+
+    /**
+     * use system property if available
+     */
+    _flappingTimeWindowMs =
+        getSystemPropertyAsInt("helixmanager.flappingTimeWindow",
+            ZKHelixManager.FLAPPING_TIME_WINDIOW);
+
+    _maxDisconnectThreshold =
+        getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold",
+            ZKHelixManager.MAX_DISCONNECT_THRESHOLD);
+
+    _sessionTimeout =
+        getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
+
+    _properties = new HelixManagerProperties("cluster-manager-version.properties");
+    _version = _properties.getVersion();
+
+  }
+
+  private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {
+    String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue);
+
+    try {
+      int value = Integer.parseInt(valueString);
+      if (value > 0) {
+        return value;
+      }
+    } catch (NumberFormatException e) {
+      LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString
+          + ", using default value: " + propertyDefaultValue);
+    }
+
+    return propertyDefaultValue;
+  }
+
+  @Override
+  public void connect() {
+    boolean isStarted = false;
+    try {
+      _lock.lock();
+      _zkclient =
+          new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+              ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+      // waitUntilConnected();
+
+      _baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
+      _configAccessor = new ConfigAccessor(_zkclient);
+
+      _zkclient.subscribeStateChanges(this);
+      handleNewSession();
+
+      isStarted = true;
+    } catch (Exception e) {
+      LOG.error("Exception connect", e);
+    } finally {
+      _lock.unlock();
+      if (!isStarted) {
+        disconnect();
+      }
+    }
+  }
+
+  @Override
+  public void disconnect() {
+    if (_zkclient == null) {
+      return;
+    }
+
+    LOG.info("Disconnecting connection: " + this);
+
+    try {
+      _lock.lock();
+      for (final HelixConnectionStateListener listener : _connectionListener) {
+        try {
+
+          listener.onDisconnecting();
+        } catch (Exception e) {
+          LOG.error("Exception in calling disconnect on listener: " + listener, e);
+        }
+      }
+      _zkclient.close();
+      _zkclient = null;
+      LOG.info("Disconnected connection: " + this);
+    } catch (Exception e) {
+      LOG.error("Exception disconnect", e);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  @Override
+  public boolean isConnected() {
+    try {
+      _lock.lock();
+      return _zkclient != null;
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  @Override
+  public HelixParticipant createParticipant(ClusterId clusterId, ParticipantId participantId) {
+    return new ZkHelixParticipant(this, clusterId, participantId);
+  }
+
+  @Override
+  public HelixController createController(ClusterId clusterId, ControllerId controllerId) {
+    return new ZkHelixController(this, clusterId, controllerId);
+  }
+
+  @Override
+  public ClusterAccessor createClusterAccessor(ClusterId clusterId) {
+    return new ClusterAccessor(clusterId, createDataAccessor(clusterId));
+  }
+
+  @Override
+  public ResourceAccessor createResourceAccessor(ClusterId clusterId) {
+    return new ResourceAccessor(createDataAccessor(clusterId));
+  }
+
+  @Override
+  public ParticipantAccessor createParticipantAccessor(ClusterId clusterId) {
+    return new ParticipantAccessor(createDataAccessor(clusterId));
+  }
+
+  @Override
+  public HelixAdmin createClusterManagmentTool() {
+    return new ZKHelixAdmin(_zkclient);
+  }
+
+  @Override
+  public HelixPropertyStore<ZNRecord> createPropertyStore(ClusterId clusterId) {
+    PropertyKey key = new PropertyKey.Builder(clusterId.stringify()).propertyStore();
+    return new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient),
+        key.getPath(), null);
+  }
+
+  @Override
+  public HelixDataAccessor createDataAccessor(ClusterId clusterId) {
+    return new ZKHelixDataAccessor(clusterId.stringify(), _baseAccessor);
+  }
+
+  @Override
+  public ConfigAccessor getConfigAccessor() {
+    return _configAccessor;
+  }
+
+  @Override
+  public void addControllerListener(HelixRole role, ControllerChangeListener listener,
+      ClusterId clusterId) {
+
+    addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).controller(),
+        ChangeType.CONTROLLER, new EventType[] {
+            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+        });
+  }
+
+  @Override
+  public void addMessageListener(HelixRole role, MessageListener listener, ClusterId clusterId,
+      ParticipantId participantId) {
+
+    addListener(role, listener,
+        new PropertyKey.Builder(clusterId.stringify()).messages(participantId.stringify()),
+        ChangeType.MESSAGE, new EventType[] {
+            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+        });
+  }
+
+  @Override
+  public void addControllerMessageListener(HelixRole role, MessageListener listener,
+      ClusterId clusterId) {
+
+    addListener(role, listener,
+        new PropertyKey.Builder(clusterId.stringify()).controllerMessages(),
+        ChangeType.MESSAGES_CONTROLLER, new EventType[] {
+            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+        });
+  }
+
+  @Override
+  public void addIdealStateChangeListener(HelixRole role, IdealStateChangeListener listener,
+      ClusterId clusterId) {
+
+    addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).idealStates(),
+        ChangeType.IDEAL_STATE, new EventType[] {
+            EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated
+        });
+  }
+
+  @Override
+  public void addLiveInstanceChangeListener(HelixRole role, LiveInstanceChangeListener listener,
+      ClusterId clusterId) {
+
+    addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).liveInstances(),
+        ChangeType.LIVE_INSTANCE, new EventType[] {
+            EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted,
+            EventType.NodeCreated
+        });
+  }
+
+  @Override
+  public void addConfigChangeListener(HelixRole role, ConfigChangeListener listener,
+      ClusterId clusterId) {
+
+    addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).instanceConfigs(),
+        ChangeType.INSTANCE_CONFIG, new EventType[] {
+          EventType.NodeChildrenChanged
+        });
+  }
+
+  @Override
+  public void addInstanceConfigChangeListener(HelixRole role,
+      InstanceConfigChangeListener listener, ClusterId clusterId) {
+    addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).instanceConfigs(),
+        ChangeType.INSTANCE_CONFIG, new EventType[] {
+          EventType.NodeChildrenChanged
+        });
+  }
+
+  @Override
+  public void addConfigChangeListener(HelixRole role, ScopedConfigChangeListener listener,
+      ClusterId clusterId, ConfigScopeProperty scope) {
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterId.stringify());
+
+    PropertyKey propertyKey = null;
+    switch (scope) {
+    case CLUSTER:
+      propertyKey = keyBuilder.clusterConfigs();
+      break;
+    case PARTICIPANT:
+      propertyKey = keyBuilder.instanceConfigs();
+      break;
+    case RESOURCE:
+      propertyKey = keyBuilder.resourceConfigs();
+      break;
+    default:
+      break;
+    }
+
+    if (propertyKey == null) {
+      LOG.error("Failed to add listener: " + listener + ", unrecognized config scope: " + scope);
+      return;
+    }
+
+    addListener(role, listener, propertyKey, ChangeType.CONFIG, new EventType[] {
+      EventType.NodeChildrenChanged
+    });
+  }
+
+  @Override
+  public void addCurrentStateChangeListener(HelixRole role, CurrentStateChangeListener listener,
+      ClusterId clusterId, ParticipantId participantId, SessionId sessionId) {
+
+    addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).currentStates(
+        participantId.stringify(), sessionId.stringify()), ChangeType.CURRENT_STATE,
+        new EventType[] {
+            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+        });
+  }
+
+  @Override
+  public void addHealthStateChangeListener(HelixRole role, HealthStateChangeListener listener,
+      ClusterId clusterId, ParticipantId participantId) {
+    addListener(role, listener,
+        new PropertyKey.Builder(clusterId.stringify()).healthReports(participantId.stringify()),
+        ChangeType.HEALTH,
+        new EventType[] {
+            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+        });
+  }
+
+  @Override
+  public void addExternalViewChangeListener(HelixRole role, ExternalViewChangeListener listener,
+      ClusterId clusterId) {
+    addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).externalViews(),
+        ChangeType.EXTERNAL_VIEW, new EventType[] {
+            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+        });
+  }
+
+  @Override
+  public boolean removeListener(HelixRole role, Object listener, PropertyKey key) {
+    LOG.info("role: " + role + " removing listener: " + listener + " on path: " + key.getPath()
+        + " from connection: " + this);
+    List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
+    List<CallbackHandler> handlerList = _handlers.get(role);
+    if (handlerList == null) {
+      return true;
+    }
+
+    synchronized (this) {
+      for (CallbackHandler handler : handlerList) {
+        // compare property-key path and listener reference
+        if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) {
+          toRemove.add(handler);
+        }
+      }
+
+      handlerList.removeAll(toRemove);
+      if (handlerList.isEmpty()) {
+        _handlers.remove(role);
+      }
+    }
+
+    // handler.reset() may modify the handlers list, so do it outside the iteration
+    for (CallbackHandler handler : toRemove) {
+      handler.reset();
+    }
+
+    return true;
+  }
+
+  @Override
+  public void addConnectionStateListener(HelixConnectionStateListener listener) {
+    synchronized (_connectionListener) {
+      _connectionListener.add(listener);
+    }
+  }
+
+  @Override
+  public void removeConnectionStateListener(HelixConnectionStateListener listener) {
+    synchronized (_connectionListener) {
+      _connectionListener.remove(listener);
+    }
+  }
+
+  @Override
+  public void handleStateChanged(KeeperState state) throws Exception {
+    try {
+      _lock.lock();
+
+      switch (state) {
+      case SyncConnected:
+        ZkConnection zkConnection = (ZkConnection) _zkclient.getConnection();
+        LOG.info("KeeperState: " + state + ", zookeeper:" + zkConnection.getZookeeper());
+        break;
+      case Disconnected:
+        LOG.info("KeeperState:" + state + ", disconnectedSessionId: " + _sessionId);
+
+        /**
+         * Track the time stamp that the disconnected happens, then check history and see if
+         * we should disconnect the helix-manager
+         */
+        _disconnectTimeHistory.add(System.currentTimeMillis());
+        if (isFlapping()) {
+          LOG.error("helix-connection: " + this + ", sessionId: " + _sessionId
+              + " is flapping. diconnect it. " + " maxDisconnectThreshold: "
+              + _maxDisconnectThreshold + " disconnects in " + _flappingTimeWindowMs + "ms");
+          disconnect();
+        }
+        break;
+      case Expired:
+        LOG.info("KeeperState:" + state + ", expiredSessionId: " + _sessionId);
+        break;
+      default:
+        break;
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  @Override
+  public void handleNewSession() throws Exception {
+    waitUntilConnected();
+
+    try {
+      _lock.lock();
+
+      for (final HelixConnectionStateListener listener : _connectionListener) {
+        try {
+          listener.onConnected();
+        } catch (Exception e) {
+          LOG.error("Exception invoking connect on listener: " + listener, e);
+        }
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  @Override
+  public SessionId getSessionId() {
+    return _sessionId;
+  }
+
+  @Override
+  public String getHelixVersion() {
+    return _version;
+  }
+
+  @Override
+  public HelixManagerProperties getHelixProperties() {
+    return _properties;
+  }
+
+  /**
+   * wait until we get a non-zero session-id. note that we might lose zkconnection
+   * right after we read session-id. but it's ok to get stale session-id and we will have
+   * another handle-new-session callback to correct this.
+   */
+  private void waitUntilConnected() {
+    boolean isConnected;
+    do {
+      isConnected =
+          _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+      if (!isConnected) {
+        LOG.error("fail to connect zkserver: " + _zkAddr + " in "
+            + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId);
+        continue;
+      }
+
+      ZkConnection zkConnection = ((ZkConnection) _zkclient.getConnection());
+      _sessionId = SessionId.from(Long.toHexString(zkConnection.getZookeeper().getSessionId()));
+
+      /**
+       * at the time we read session-id, zkconnection might be lost again
+       * wait until we get a non-zero session-id
+       */
+    } while ("0".equals(_sessionId));
+
+    LOG.info("Handling new session, session id: " + _sessionId + ", zkconnection: "
+        + ((ZkConnection) _zkclient.getConnection()).getZookeeper());
+  }
+
+  @Override
+  public int getSessionTimeout() {
+    return _sessionTimeout;
+  }
+
+  @Override
+  public ClusterMessagingService createMessagingService(HelixRole role) {
+    HelixManager manager = new HelixConnectionAdaptor(role);
+    return new DefaultMessagingService(manager);
+  }
+
+  void addListener(HelixRole role, Object listener, PropertyKey propertyKey, ChangeType changeType,
+      EventType[] eventType) {
+    // checkConnected();
+    HelixManager manager = new HelixConnectionAdaptor(role);
+    PropertyType type = propertyKey.getType();
+
+    synchronized (this) {
+      if (!_handlers.containsKey(role)) {
+        _handlers.put(role, new CopyOnWriteArrayList<CallbackHandler>());
+      }
+      List<CallbackHandler> handlerList = _handlers.get(role);
+
+      for (CallbackHandler handler : handlerList) {
+        // compare property-key path and listener reference
+        if (handler.getPath().equals(propertyKey.getPath())
+            && handler.getListener().equals(listener)) {
+          LOG.info("role: " + role + ", listener: " + listener + " on path: "
+              + propertyKey.getPath() + " already exists. skip add");
+
+          return;
+        }
+      }
+
+      CallbackHandler newHandler =
+          new CallbackHandler(manager, _zkclient, propertyKey, listener, eventType, changeType);
+
+      handlerList.add(newHandler);
+      LOG.info("role: " + role + " added listener: " + listener + " for type: " + type
+          + " to path: " + newHandler.getPath());
+    }
+  }
+
+  void initHandlers(HelixRole role) {
+    synchronized (this) {
+      List<CallbackHandler> handlerList = _handlers.get(role);
+
+      if (handlerList != null) {
+        for (CallbackHandler handler : handlerList) {
+          handler.init();
+          LOG.info("role: " + role + ", init handler: " + handler.getPath() + ", "
+              + handler.getListener());
+        }
+      }
+    }
+  }
+
+  void resetHandlers(HelixRole role) {
+    synchronized (this) {
+      List<CallbackHandler> handlerList = _handlers.get(role);
+
+      if (handlerList != null) {
+        for (CallbackHandler handler : handlerList) {
+          handler.reset();
+          LOG.info("role: " + role + ", reset handler: " + handler.getPath() + ", "
+              + handler.getListener());
+        }
+      }
+    }
+  }
+
+  /**
+   * If zk state has changed into DISCONNECTED for _maxDisconnectThreshold times during
+   * _timeWindowLengthMs time window, it's flapping and we tear down the zk-connection
+   */
+  private boolean isFlapping() {
+    if (_disconnectTimeHistory.size() == 0) {
+      return false;
+    }
+    long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
+
+    // Remove disconnect history timestamp that are older than _flappingTimeWindowMs ago
+    while ((_disconnectTimeHistory.get(0) + _flappingTimeWindowMs) < mostRecentTimestamp) {
+      _disconnectTimeHistory.remove(0);
+    }
+    return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
new file mode 100644
index 0000000..b0c5a8b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -0,0 +1,236 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.Id;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.HealthStatsAggregator;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.monitoring.StatusDumpTask;
+import org.apache.log4j.Logger;
+
+public class ZkHelixController implements HelixController {
+  private static Logger LOG = Logger.getLogger(ZkHelixController.class);
+
+  final ZkHelixConnection _connection;
+  final ClusterId _clusterId;
+  final ControllerId _controllerId;
+  final GenericHelixController _pipeline;
+  final DefaultMessagingService _messagingService;
+  final List<HelixTimerTask> _timerTasks;
+  final ClusterAccessor _clusterAccessor;
+  final HelixDataAccessor _accessor;
+  final HelixManager _manager;
+  final ZkHelixLeaderElection _leaderElection;
+
+  public ZkHelixController(ZkHelixConnection connection, ClusterId clusterId, ControllerId controllerId) {
+    _connection = connection;
+    _clusterId = clusterId;
+    _controllerId = controllerId;
+    _pipeline = new GenericHelixController();
+    _clusterAccessor = connection.createClusterAccessor(clusterId);
+    _accessor = connection.createDataAccessor(clusterId);
+
+    _messagingService = (DefaultMessagingService) connection.createMessagingService(this);
+    _timerTasks = new ArrayList<HelixTimerTask>();
+
+    _manager = new HelixConnectionAdaptor(this);
+    _leaderElection = new ZkHelixLeaderElection(this, _pipeline);
+
+    _timerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(_manager)));
+    _timerTasks.add(new StatusDumpTask(clusterId, _manager.getHelixDataAccessor()));
+  }
+
+  void startTimerTasks() {
+    for (HelixTimerTask task : _timerTasks) {
+      task.start();
+    }
+  }
+
+  void stopTimerTasks() {
+    for (HelixTimerTask task : _timerTasks) {
+      task.stop();
+    }
+  }
+
+  @Override
+  public HelixConnection getConnection() {
+    return _connection;
+  }
+
+  @Override
+  public void startAsync() {
+    _connection.addConnectionStateListener(this);
+    onConnected();
+  }
+
+  @Override
+  public void stopAsync() {
+    _connection.removeConnectionStateListener(this);
+    onDisconnecting();
+  }
+
+  void reset() {
+    /**
+     * reset all handlers, make sure cleanup completed for previous session
+     * disconnect if fail to cleanup
+     */
+    _connection.resetHandlers(this);
+
+  }
+
+  void init() {
+    /**
+     * from here on, we are dealing with new session
+     * init handlers
+     */
+    if (!_clusterAccessor.isClusterStructureValid()) {
+      throw new HelixException("Cluster structure is not set up for cluster: " + _clusterId);
+    }
+
+    /**
+     * leader-election listener should be reset/init before all other controller listeners;
+     * it's ok to add a listener multiple times, since we check existence in
+     * ZkHelixConnection#addXXXListner()
+     */
+    _connection.addControllerListener(this, _leaderElection, _clusterId);
+
+    /**
+     * ok to init message handler and controller handlers twice
+     * the second init will be skipped (see CallbackHandler)
+     */
+    _connection.initHandlers(this);
+  }
+
+  @Override
+  public void onConnected() {
+    reset();
+    init();
+  }
+
+  @Override
+  public void onDisconnecting() {
+    LOG.info("disconnecting " + _controllerId + "(" + getType() + ") from " + _clusterId);
+
+    reset();
+  }
+
+  @Override
+  public ClusterMessagingService getMessagingService() {
+    return _messagingService;
+  }
+
+  @Override
+  public ClusterId getClusterId() {
+    return _clusterId;
+  }
+
+  @Override
+  public ControllerId getControllerId() {
+    return _controllerId;
+  }
+
+  @Override
+  public Id getId() {
+    return getControllerId();
+  }
+
+  @Override
+  public InstanceType getType() {
+    return InstanceType.CONTROLLER;
+  }
+
+  @Override
+  public boolean isLeader() {
+    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+    try {
+      LiveInstance leader = _accessor.getProperty(keyBuilder.controllerLeader());
+      if (leader != null) {
+        String leaderName = leader.getInstanceName();
+        String sessionId = leader.getSessionId();
+        if (leaderName != null && leaderName.equals(_controllerId.stringify()) && sessionId != null
+            && sessionId.equals(_connection.getSessionId().stringify())) {
+          return true;
+        }
+      }
+    } catch (Exception e) {
+      // log
+    }
+    return false;
+  }
+
+  void addListenersToController(GenericHelixController pipeline) {
+    try {
+      /**
+       * setup controller message listener and register message handlers
+       */
+      _connection.addControllerMessageListener(this, _messagingService.getExecutor(),
+          _clusterId);
+      MessageHandlerFactory defaultControllerMsgHandlerFactory =
+          new DefaultControllerMessageHandlerFactory();
+      _messagingService.getExecutor().registerMessageHandlerFactory(
+          defaultControllerMsgHandlerFactory.getMessageType(), defaultControllerMsgHandlerFactory);
+      MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
+          new DefaultSchedulerMessageHandlerFactory(_manager);
+      _messagingService.getExecutor().registerMessageHandlerFactory(
+          defaultSchedulerMsgHandlerFactory.getMessageType(), defaultSchedulerMsgHandlerFactory);
+      MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
+          new DefaultParticipantErrorMessageHandlerFactory(_manager);
+      _messagingService.getExecutor().registerMessageHandlerFactory(
+          defaultParticipantErrorMessageHandlerFactory.getMessageType(),
+          defaultParticipantErrorMessageHandlerFactory);
+
+      /**
+       * setup generic-controller
+       */
+      _connection.addConfigChangeListener(this, pipeline, _clusterId);
+      _connection.addLiveInstanceChangeListener(this, pipeline, _clusterId);
+      _connection.addIdealStateChangeListener(this, pipeline, _clusterId);
+      _connection.addControllerListener(this, pipeline, _clusterId);
+    } catch (ZkInterruptedException e) {
+      LOG.warn("zk connection is interrupted during addListenersToController()"
+          + e);
+    } catch (Exception e) {
+      LOG.error("Error addListenersToController", e);
+    }
+  }
+
+  void removeListenersFromController(GenericHelixController pipeline) {
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(_manager.getClusterName());
+    /**
+     * reset generic-controller
+     */
+    _connection.removeListener(this, pipeline, keyBuilder.instanceConfigs());
+    _connection.removeListener(this, pipeline, keyBuilder.liveInstances());
+    _connection.removeListener(this, pipeline, keyBuilder.idealStates());
+    _connection.removeListener(this, pipeline, keyBuilder.controller());
+
+    /**
+     * reset controller message listener and unregister all message handlers
+     */
+    _connection.removeListener(this, _messagingService.getExecutor(),
+        keyBuilder.controllerMessages());
+  }
+
+  HelixManager getManager() {
+    return _manager;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
new file mode 100644
index 0000000..06e8cd8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
@@ -0,0 +1,148 @@
+package org.apache.helix.manager.zk;
+
+import java.lang.management.ManagementFactory;
+
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.model.LeaderHistory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.log4j.Logger;
+
+// TODO GenericHelixController has a controller-listener, we can invoke leader-election from there
+public class ZkHelixLeaderElection implements ControllerChangeListener {
+  private static Logger LOG = Logger.getLogger(ZkHelixLeaderElection.class);
+
+  final ZkHelixController _controller;
+  final ClusterId _clusterId;
+  final ControllerId _controllerId;
+  final HelixManager _manager;
+  final GenericHelixController _pipeline;
+
+  public ZkHelixLeaderElection(ZkHelixController controller, GenericHelixController pipeline) {
+    _controller = controller;
+    _clusterId = controller.getClusterId();
+    _controllerId = controller.getControllerId();
+    _pipeline = pipeline;
+    _manager = controller.getManager();
+  }
+
+  /**
+   * may be accessed by multiple threads: zk-client thread and
+   * ZkHelixManager.disconnect()->reset() TODO: Refactor accessing
+   * HelixMangerMain class statically
+   */
+  @Override
+  public synchronized void onControllerChange(NotificationContext changeContext) {
+    HelixManager manager = changeContext.getManager();
+    if (manager == null) {
+      LOG.error("missing attributes in changeContext. requires HelixManager");
+      return;
+    }
+
+    InstanceType type = _manager.getInstanceType();
+    if (type != InstanceType.CONTROLLER && type != InstanceType.CONTROLLER_PARTICIPANT) {
+      LOG.error("fail to become controller because incorrect instanceType (was " + type.toString()
+          + ", requires CONTROLLER | CONTROLLER_PARTICIPANT)");
+      return;
+    }
+
+    try {
+      if (changeContext.getType().equals(NotificationContext.Type.INIT)
+          || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
+        LOG.info(_controllerId + " is trying to acquire leadership for cluster: " + _clusterId);
+        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+        while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
+          boolean success = tryUpdateController(_manager);
+          if (success) {
+            LOG.info(_controllerId + " acquires leadership of cluster: " + _clusterId);
+
+            updateHistory(_manager);
+            _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
+            _controller.addListenersToController(_pipeline);
+            _controller.startTimerTasks();
+          }
+        }
+      } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
+        LOG.info(_controllerId + " reqlinquishes leadership of cluster: " + _clusterId);
+        _controller.stopTimerTasks();
+        _controller.removeListenersFromController(_pipeline);
+
+        /**
+         * clear write-through cache
+         */
+        _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
+      }
+
+    } catch (Exception e) {
+      LOG.error("Exception when trying to become leader", e);
+    }
+  }
+
+  private boolean tryUpdateController(HelixManager manager) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    LiveInstance leader = new LiveInstance(manager.getInstanceName());
+    try {
+      leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+      leader.setSessionId(manager.getSessionId());
+      leader.setHelixVersion(manager.getVersion());
+      if (ZKPropertyTransferServer.getInstance() != null) {
+        String zkPropertyTransferServiceUrl =
+            ZKPropertyTransferServer.getInstance().getWebserviceUrl();
+        if (zkPropertyTransferServiceUrl != null) {
+          leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
+        }
+      } else {
+        LOG.warn("ZKPropertyTransferServer instnace is null");
+      }
+      boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
+      if (success) {
+        return true;
+      } else {
+        LOG.info("Unable to become leader probably because some other controller becames the leader");
+      }
+    } catch (Exception e) {
+      LOG.error(
+          "Exception when trying to updating leader record in cluster:" + manager.getClusterName()
+              + ". Need to check again whether leader node has been created or not", e);
+    }
+
+    leader = accessor.getProperty(keyBuilder.controllerLeader());
+    if (leader != null) {
+      String leaderSessionId = leader.getSessionId();
+      LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
+          + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
+
+      if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void updateHistory(HelixManager manager) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
+    if (history == null) {
+      history = new LeaderHistory(PropertyType.HISTORY.toString());
+    }
+    history.updateHistory(manager.getClusterName(), manager.getInstanceName());
+    accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6210b0d2/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
new file mode 100644
index 0000000..eba96c9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
@@ -0,0 +1,475 @@
+package org.apache.helix.manager.zk;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceInfoProvider;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.accessor.ParticipantAccessor;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.Id;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.healthcheck.ParticipantHealthReportTask;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkHelixParticipant implements HelixParticipant {
+  private static Logger LOG = Logger.getLogger(ZkHelixParticipant.class);
+
+  final ZkHelixConnection _connection;
+  final ClusterId _clusterId;
+  final ParticipantId _participantId;
+  final ZKHelixDataAccessor _accessor;
+  final BaseDataAccessor<ZNRecord> _baseAccessor;
+  final PropertyKey.Builder _keyBuilder;
+  final ConfigAccessor _configAccessor;
+  final ClusterAccessor _clusterAccessor;
+  final ParticipantAccessor _participantAccessor;
+  final DefaultMessagingService _messagingService;
+  final List<PreConnectCallback> _preConnectCallbacks;
+  final List<HelixTimerTask> _timerTasks;
+  final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
+
+  /**
+   * state-transition message handler factory for helix-participant
+   */
+  final StateMachineEngine _stateMachineEngine;
+
+  LiveInstanceInfoProvider _liveInstanceInfoProvider;
+
+  public ZkHelixParticipant(ZkHelixConnection connection, ClusterId clusterId,
+      ParticipantId participantId) {
+    _connection = connection;
+    _accessor = (ZKHelixDataAccessor) connection.createDataAccessor(clusterId);
+    _baseAccessor = _accessor.getBaseDataAccessor();
+    _keyBuilder = _accessor.keyBuilder();
+    _clusterAccessor = connection.createClusterAccessor(clusterId);
+    _participantAccessor = connection.createParticipantAccessor(clusterId);
+    _configAccessor = connection.getConfigAccessor();
+
+    _clusterId = clusterId;
+    _participantId = participantId;
+
+    _messagingService = (DefaultMessagingService) connection.createMessagingService(this);
+    HelixManager manager = new HelixConnectionAdaptor(this);
+    _stateMachineEngine = new HelixStateMachineEngine(manager);
+    _preConnectCallbacks = new ArrayList<PreConnectCallback>();
+    _timerTasks = new ArrayList<HelixTimerTask>();
+    _participantHealthInfoCollector =
+        new ParticipantHealthReportCollectorImpl(manager, participantId.stringify());
+
+    _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
+
+  }
+
+  @Override
+  public ClusterId getClusterId() {
+    return _clusterId;
+  }
+
+  @Override
+  public ParticipantId getParticipantId() {
+    return _participantId;
+  }
+
+  @Override
+  public HelixConnection getConnection() {
+    return _connection;
+  }
+
+  void startTimerTasks() {
+    for (HelixTimerTask task : _timerTasks) {
+      task.start();
+    }
+  }
+
+  void stopTimerTasks() {
+    for (HelixTimerTask task : _timerTasks) {
+      task.stop();
+    }
+  }
+
+  void reset() {
+    /**
+     * stop timer tasks, reset all handlers, make sure cleanup completed for previous session,
+     * disconnect if cleanup fails
+     */
+    stopTimerTasks();
+    _connection.resetHandlers(this);
+
+    /**
+     * clear write-through cache
+     */
+    _accessor.getBaseDataAccessor().reset();
+  }
+
+  private void createLiveInstance() {
+    String liveInstancePath = _keyBuilder.liveInstance(_participantId.stringify()).getPath();
+    String sessionId = _connection.getSessionId().stringify();
+    LiveInstance liveInstance = new LiveInstance(_participantId.stringify());
+    liveInstance.setSessionId(sessionId);
+    liveInstance.setHelixVersion(_connection.getHelixVersion());
+    liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+
+    if (_liveInstanceInfoProvider != null) {
+      LOG.info("Additional live instance information is provided: " + _liveInstanceInfoProvider);
+      ZNRecord additionalLiveInstanceInfo =
+          _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
+      if (additionalLiveInstanceInfo != null) {
+        additionalLiveInstanceInfo.merge(liveInstance.getRecord());
+        ZNRecord mergedLiveInstance =
+            new ZNRecord(additionalLiveInstanceInfo, _participantId.stringify());
+        liveInstance = new LiveInstance(mergedLiveInstance);
+        LOG.info("Participant: " + _participantId + ", mergedLiveInstance: " + liveInstance);
+      }
+    }
+
+    boolean retry;
+    do {
+      retry = false;
+      boolean success =
+          _baseAccessor.create(liveInstancePath, liveInstance.getRecord(), AccessOption.EPHEMERAL);
+      if (!success) {
+        LOG.warn("found another participant with same name: " + _participantId + " in cluster "
+            + _clusterId);
+
+        Stat stat = new Stat();
+        ZNRecord record = _baseAccessor.get(liveInstancePath, stat, 0);
+        if (record == null) {
+          /**
+           * live-instance is gone as we check it, retry create live-instance
+           */
+          retry = true;
+        } else {
+          String ephemeralOwner = Long.toHexString(stat.getEphemeralOwner());
+          if (ephemeralOwner.equals(sessionId)) {
+            /**
+             * update sessionId field in live-instance if necessary
+             */
+            LiveInstance curLiveInstance = new LiveInstance(record);
+            if (!curLiveInstance.getSessionId().equals(sessionId)) {
+              /**
+               * in last handle-new-session,
+               * live-instance is created by new zkconnection with stale session-id inside
+               * just update session-id field
+               */
+              LOG.info("overwriting session-id by ephemeralOwner: " + ephemeralOwner
+                  + ", old-sessionId: " + curLiveInstance.getSessionId() + ", new-sessionId: "
+                  + sessionId);
+
+              curLiveInstance.setSessionId(sessionId);
+              success =
+                  _baseAccessor.set(liveInstancePath, curLiveInstance.getRecord(),
+                      stat.getVersion(), AccessOption.EPHEMERAL);
+              if (!success) {
+                LOG.error("Someone changes sessionId as we update, should not happen");
+                throw new HelixException("fail to create live-instance for " + _participantId);
+              }
+            }
+          } else {
+            /**
+             * wait for a while, in case previous helix-participant exits unexpectedly
+             * and its live-instance still hangs around until session timeout
+             */
+            try {
+              TimeUnit.MILLISECONDS.sleep(_connection.getSessionTimeout() + 5000);
+            } catch (InterruptedException ex) {
+              LOG.warn("Sleep interrupted while waiting for previous live-instance to go away", ex);
+            }
+            /**
+             * give a last try after exit while loop
+             */
+            retry = true;
+            break;
+          }
+        }
+      }
+    } while (retry);
+
+    /**
+     * give a last shot
+     */
+    if (retry) {
+      boolean success =
+          _baseAccessor.create(liveInstancePath, liveInstance.getRecord(), AccessOption.EPHEMERAL);
+      if (!success) {
+        LOG.error("instance: " + _participantId + " already has a live-instance in cluster "
+            + _clusterId);
+        throw new HelixException("fail to create live-instance for " + _participantId);
+      }
+    }
+  }
+
+  /**
+   * carry over current-states from last sessions
+   * set to initial state for current session only when state doesn't exist in current session
+   */
+  private void carryOverPreviousCurrentState() {
+    String sessionId = _connection.getSessionId().stringify();
+    String participantName = _participantId.stringify();
+    List<String> sessions = _accessor.getChildNames(_keyBuilder.sessions(participantName));
+
+    for (String session : sessions) {
+      if (session.equals(sessionId)) {
+        continue;
+      }
+
+      List<CurrentState> lastCurStates =
+          _accessor.getChildValues(_keyBuilder.currentStates(participantName, session));
+
+      for (CurrentState lastCurState : lastCurStates) {
+        LOG.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId()
+            + " to current session: " + sessionId);
+        String stateModelDefRef = lastCurState.getStateModelDefRef();
+        if (stateModelDefRef == null) {
+          LOG.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
+              + lastCurState);
+          continue;
+        }
+        StateModelDefinition stateModel =
+            _accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef));
+
+        String curStatePath =
+            _keyBuilder.currentState(participantName, sessionId, lastCurState.getResourceName())
+                .getPath();
+        _accessor.getBaseDataAccessor().update(
+            curStatePath,
+            new CurStateCarryOverUpdater(sessionId, stateModel.getInitialState(), lastCurState),
+            AccessOption.PERSISTENT);
+      }
+    }
+
+    /**
+     * remove previous current state parent nodes
+     */
+    for (String session : sessions) {
+      if (session.equals(sessionId)) {
+        continue;
+      }
+
+      PropertyKey key = _keyBuilder.currentStates(participantName, session);
+      LOG.info("Removing current states from previous sessions. path: " + key.getPath());
+      _accessor.removeProperty(key);
+    }
+  }
+
+  /**
+   * Read cluster config and see if instance can auto join the cluster
+   */
+  private void joinCluster() {
+    boolean autoJoin = false;
+    try {
+      HelixConfigScope scope =
+          new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
+              _clusterId.stringify()).build();
+      autoJoin =
+          Boolean
+              .parseBoolean(_configAccessor.get(scope, HelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
+      LOG.info("instance: " + _participantId + " auto-joining " + _clusterId + " is " + autoJoin);
+    } catch (Exception e) {
+      // autoJoin is false
+    }
+
+    if (!_participantAccessor.isParticipantStructureValid(_participantId)) {
+      if (!autoJoin) {
+        throw new HelixException("Initial cluster structure is not set up for instance: "
+            + _participantId + ", instanceType: " + getType());
+      } else {
+        LOG.info(_participantId + " is auto-joining cluster: " + _clusterId);
+        String participantName = _participantId.stringify();
+        String hostName = participantName;
+        int port = -1;
+        int lastPos = participantName.lastIndexOf("_");
+        if (lastPos > 0) {
+          hostName = participantName.substring(0, lastPos);
+          try {
+            port = Integer.parseInt(participantName.substring(lastPos + 1));
+          } catch (Exception e) {
+            // use port = -1
+          }
+        }
+        ParticipantConfig.Builder builder =
+            new ParticipantConfig.Builder(_participantId).hostName(hostName).port(port)
+                .enabled(true);
+        _clusterAccessor.addParticipantToCluster(builder.build());
+      }
+    }
+  }
+
+  private void setupMsgHandler() {
+    _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
+        _stateMachineEngine);
+
+    /**
+     * it's ok to add a listener multiple times, since we check existence in
+     * ZkHelixConnection#addXXXListner()
+     */
+    _connection.addMessageListener(this, _messagingService.getExecutor(), _clusterId,
+        _participantId);
+    _connection.addControllerListener(this, _accessor, _clusterId);
+
+    ScheduledTaskStateModelFactory stStateModelFactory =
+        new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
+    _stateMachineEngine.registerStateModelFactory(
+        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
+  }
+
+  /**
+   * create zk path for health check info
+   * TODO move it to cluster-setup
+   */
+  private void createHealthCheckPath() {
+    PropertyKey healthCheckInfoKey = _keyBuilder.healthReports(_participantId.stringify());
+    if (_accessor.createProperty(healthCheckInfoKey, null)) {
+      LOG.info("Created healthcheck info path: " + healthCheckInfoKey.getPath());
+    }
+  }
+
+  void init() {
+    /**
+     * from here on, we are dealing with new session
+     */
+    if (!_clusterAccessor.isClusterStructureValid()) {
+      throw new HelixException("Cluster structure is not set up for cluster: " + _clusterId);
+    }
+
+    /**
+     * auto-join
+     */
+    joinCluster();
+
+    /**
+     * Invoke PreConnectCallbacks
+     */
+    for (PreConnectCallback callback : _preConnectCallbacks) {
+      callback.onPreConnect();
+    }
+
+    createLiveInstance();
+    carryOverPreviousCurrentState();
+
+    /**
+     * setup message listener
+     */
+    setupMsgHandler();
+
+    /**
+     * start health check timer task
+     */
+    createHealthCheckPath();
+    startTimerTasks();
+
+    /**
+     * init handlers
+     * ok to init message handler and data-accessor twice
+     * the second init will be skipped (see CallbackHandler)
+     */
+    _connection.initHandlers(this);
+  }
+
+  @Override
+  public void onConnected() {
+    reset();
+    init();
+  }
+
+  @Override
+  public void onDisconnecting() {
+    LOG.info("disconnecting " + _participantId + "(" + getType() + ") from " + _clusterId);
+
+    reset();
+
+    /**
+     * shall we shutdown thread pool first to avoid reset() being invoked in the middle of state
+     * transition?
+     */
+    _messagingService.getExecutor().shutdown();
+    _accessor.shutdown();
+
+  }
+
+  @Override
+  public void startAsync() {
+    _connection.addConnectionStateListener(this);
+    onConnected();
+  }
+
+  @Override
+  public void stopAsync() {
+    _connection.removeConnectionStateListener(this);
+    onDisconnecting();
+  }
+
+  @Override
+  public ClusterMessagingService getMessagingService() {
+    return _messagingService;
+  }
+
+  @Override
+  public StateMachineEngine getStateMachineEngine() {
+    return _stateMachineEngine;
+  }
+
+  @Override
+  public Id getId() {
+    return getParticipantId();
+  }
+
+  @Override
+  public InstanceType getType() {
+    return InstanceType.PARTICIPANT;
+  }
+
+  @Override
+  public void addPreConnectCallback(PreConnectCallback callback) {
+    LOG.info("Adding preconnect callback: " + callback);
+    _preConnectCallbacks.add(callback);
+  }
+
+  @Override
+  public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
+    _liveInstanceInfoProvider = liveInstanceInfoProvider;
+  }
+
+  public HelixDataAccessor getAccessor() {
+    return _accessor;
+  }
+
+  public ClusterAccessor getClusterAccessor() {
+    return _clusterAccessor;
+  }
+
+  public ParticipantAccessor getParticipantAccessor() {
+    return _participantAccessor;
+  }
+
+}