You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:58 UTC
[50/53] [abbrv] git commit: [HELIX-259] add HelixConnection, rb=14728
[HELIX-259] add HelixConnection, rb=14728
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/c589fb8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/c589fb8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/c589fb8d
Branch: refs/heads/master
Commit: c589fb8decdc07516b49d8964bc95bd44e1d39b0
Parents: 5405df1
Author: zzhang <zz...@apache.org>
Authored: Tue Nov 5 17:55:45 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 13:17:38 2013 -0800
----------------------------------------------------------------------
.../org/apache/helix/HelixAutoController.java | 43 ++
.../java/org/apache/helix/HelixConnection.java | 253 ++++++++
.../helix/HelixConnectionStateListener.java | 13 +
.../java/org/apache/helix/HelixController.java | 18 +
.../java/org/apache/helix/HelixParticipant.java | 37 ++
.../main/java/org/apache/helix/HelixRole.java | 40 ++
.../java/org/apache/helix/HelixService.java | 16 +
.../helix/api/accessor/ClusterAccessor.java | 2 +-
.../manager/zk/HelixConnectionAdaptor.java | 296 +++++++++
.../helix/manager/zk/ZkBaseDataAccessor.java | 7 -
.../helix/manager/zk/ZkHelixAutoController.java | 114 ++++
.../helix/manager/zk/ZkHelixConnection.java | 605 +++++++++++++++++++
.../helix/manager/zk/ZkHelixController.java | 236 ++++++++
.../helix/manager/zk/ZkHelixLeaderElection.java | 148 +++++
.../helix/manager/zk/ZkHelixParticipant.java | 475 +++++++++++++++
.../apache/helix/monitoring/StatusDumpTask.java | 166 +++++
.../participant/HelixStateMachineEngine.java | 108 +++-
.../helix/participant/StateMachineEngine.java | 66 +-
.../statemachine/HelixStateModelFactory.java | 99 +++
.../HelixStateModelFactoryAdaptor.java | 17 +
.../statemachine/StateModelFactory.java | 4 +
.../helix/integration/TestHelixConnection.java | 151 +++++
22 files changed, 2878 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAutoController.java b/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
new file mode 100644
index 0000000..7ad9218
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
@@ -0,0 +1,43 @@
+package org.apache.helix;
+
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.participant.StateMachineEngine;
+
+/**
+ * Autonomous controller
+ */
+public interface HelixAutoController extends HelixRole, HelixService, HelixConnectionStateListener {
+ /**
+ * get controller id
+ * @return controller id
+ */
+ ControllerId getControllerId();
+
+ /**
+ * get state machine engine
+ * @return state machine engine
+ */
+ StateMachineEngine getStateMachineEngine();
+
+ /**
+ * add pre-connect callback
+ * @param callback
+ */
+ void addPreConnectCallback(PreConnectCallback callback);
+
+ /**
+ * Add a LiveInstanceInfoProvider that is invoked before creating liveInstance.</br>
+ * This allows applications to provide additional information that will be published to zookeeper
+ * and become available for discovery</br>
+ * @see LiveInstanceInfoProvider#getAdditionalLiveInstanceInfo()
+ * @param liveInstanceInfoProvider
+ */
+ void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider);
+
+ /**
+ * tell if this controller is leader of cluster
+ * @return
+ */
+ boolean isLeader();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/HelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConnection.java b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
new file mode 100644
index 0000000..7551673
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
@@ -0,0 +1,253 @@
+package org.apache.helix;
+
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.accessor.ParticipantAccessor;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.store.HelixPropertyStore;
+
+/**
+ * Helix connection (aka helix manager)
+ */
+public interface HelixConnection {
+
+ /**
+ * start connection
+ */
+ void connect();
+
+ /**
+ * close connection
+ */
+ void disconnect();
+
+ /**
+ * test if connection is started
+ * @return true if connection is started, false otherwise
+ */
+ boolean isConnected();
+
+ /**
+ * get session id
+ * @return session id of current connection
+ */
+ SessionId getSessionId();
+
+ /**
+ * get session timeout
+ * @return session timeout in millisecond
+ */
+ int getSessionTimeout();
+
+ /**
+ * create a helix-participant
+ * @param clusterId
+ * @param participantId
+ * @return helix-participant
+ */
+ HelixParticipant createParticipant(ClusterId clusterId, ParticipantId participantId);
+
+ /**
+ * create a helix-controller
+ * @param clusterId
+ * @param controllerId
+ * @return helix-controller
+ */
+ HelixController createController(ClusterId clusterId, ControllerId controllerId);
+
+ /**
+ * create a cluster-accessor
+ * @param clusterId
+ * @return cluster-accessor
+ */
+ ClusterAccessor createClusterAccessor(ClusterId clusterId);
+
+ /**
+ * create a resource accessor
+ * @param clusterId
+ * @return resource accessor
+ */
+ ResourceAccessor createResourceAccessor(ClusterId clusterId);
+
+ /**
+ * create a participant accessor
+ * @param clusterId
+ * @return participant-accessor
+ */
+ ParticipantAccessor createParticipantAccessor(ClusterId clusterId);
+
+ /**
+ * Provides admin interface to setup and modify cluster
+ * @return instantiated HelixAdmin
+ */
+ HelixAdmin createClusterManagmentTool();
+
+ /**
+ * create a default property-store for a cluster
+ * @param clusterId
+ * @return property-store
+ */
+ HelixPropertyStore<ZNRecord> createPropertyStore(ClusterId clusterId);
+
+ /**
+ * create a data-accessor
+ * @param clusterId
+ * @return data-accessor
+ */
+ HelixDataAccessor createDataAccessor(ClusterId clusterId);
+
+ /**
+ * get config accessor
+ * TODO replace with new ConfigAccessor
+ * @return config accessor
+ */
+ @Deprecated
+ ConfigAccessor getConfigAccessor();
+
+ /**
+ * add ideal state change listener
+ * @param role
+ * @param listener
+ * @param clusterId
+ */
+ void addIdealStateChangeListener(HelixRole role, IdealStateChangeListener listener,
+ ClusterId clusterId);
+
+ /**
+ * add controller message listener
+ * @param role
+ * @param listener
+ * @param clusterId
+ */
+ void addControllerMessageListener(HelixRole role, MessageListener listener, ClusterId clusterId);
+
+ /**
+ * add controller listener
+ * @param role
+ * @param listener
+ * @param clusterId
+ */
+ void addControllerListener(HelixRole role, ControllerChangeListener listener, ClusterId clusterId);
+
+ /**
+ * add live-instance listener using this connection
+ * @param role
+ * @param listener
+ * @param clusterId
+ */
+ void addLiveInstanceChangeListener(HelixRole role, LiveInstanceChangeListener listener,
+ ClusterId clusterId);
+
+ /**
+ * add message listener
+ * @param role
+ * @param listener
+ * @param clusterId
+ * @param participantId
+ */
+ void addMessageListener(HelixRole role, MessageListener listener, ClusterId clusterId,
+ ParticipantId participantId);
+
+ /**
+ * add config change listener
+ * @param role
+ * @param listener
+ * @param clusterId
+ */
+ @Deprecated
+ void addConfigChangeListener(HelixRole role, ConfigChangeListener listener, ClusterId clusterId);
+
+ /**
+ * add instance config change listener
+ * @see InstanceConfigChangeListener#onInstanceConfigChange(List, NotificationContext)
+ * @param role
+ * @param listener
+ * @param clusterId
+ */
+ void addInstanceConfigChangeListener(HelixRole role, InstanceConfigChangeListener listener,
+ ClusterId clusterId);
+
+ /**
+ * add config change listener for a scope
+ * @see ScopedConfigChangeListener#onConfigChange(List, NotificationContext)
+ * @param role
+ * @param listener
+ * @param clusterId
+ * @param scope
+ */
+ void addConfigChangeListener(HelixRole role, ScopedConfigChangeListener listener,
+ ClusterId clusterId, ConfigScopeProperty scope);
+
+ /**
+ * add current state change listener
+ * @param role
+ * @param listener
+ * @param clusterId
+ * @param participantId
+ * @param sessionId
+ */
+ void addCurrentStateChangeListener(HelixRole role, CurrentStateChangeListener listener,
+ ClusterId clusterId, ParticipantId participantId, SessionId sessionId);
+
+ /**
+ * add health state change listener
+ * @see HealthStateChangeListener#onHealthChange(String, List, NotificationContext)
+ * @param listener
+ * @param instanceName
+ */
+ void addHealthStateChangeListener(HelixRole role, HealthStateChangeListener listener,
+ ClusterId clusterId, ParticipantId participantId);
+
+ /**
+ * add external view change listener
+ * @see ExternalViewChangeListener#onExternalViewChange(List, NotificationContext)
+ * @param listener
+ */
+ void addExternalViewChangeListener(HelixRole role, ExternalViewChangeListener listener,
+ ClusterId clusterId);
+
+ /**
+ * remove a listener
+ * @param role
+ * @param listener
+ * @param key
+ * @return
+ */
+ boolean removeListener(HelixRole role, Object listener, PropertyKey key);
+
+ /**
+ * add connection state listener
+ * @param listener
+ */
+ void addConnectionStateListener(HelixConnectionStateListener listener);
+
+ /**
+ * remove connection state listener
+ * @param listener
+ */
+ void removeConnectionStateListener(HelixConnectionStateListener listener);
+
+ /**
+ * create messasing service using this connection
+ * @param role
+ * @return messaging-service
+ */
+ ClusterMessagingService createMessagingService(HelixRole role);
+
+ /**
+ * get helix version
+ * @return helix version
+ */
+ String getHelixVersion();
+
+ /**
+ * get helix properties
+ * @return helix-properties
+ */
+ HelixManagerProperties getHelixProperties();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java b/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
new file mode 100644
index 0000000..13172d0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
@@ -0,0 +1,13 @@
+package org.apache.helix;
+
+public interface HelixConnectionStateListener {
+ /**
+ * called after connection is established
+ */
+ void onConnected();
+
+ /**
+ * called before disconnect
+ */
+ void onDisconnecting();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/HelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixController.java b/helix-core/src/main/java/org/apache/helix/HelixController.java
new file mode 100644
index 0000000..ce47e3d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixController.java
@@ -0,0 +1,18 @@
+package org.apache.helix;
+
+import org.apache.helix.api.id.ControllerId;
+
+public interface HelixController extends HelixRole, HelixService, HelixConnectionStateListener {
+
+ /**
+ * get controller id
+ * @return controller id
+ */
+ ControllerId getControllerId();
+
+ /**
+ * tell if this controller is leader of cluster
+ * @return
+ */
+ boolean isLeader();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixParticipant.java b/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
new file mode 100644
index 0000000..9002b15
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
@@ -0,0 +1,37 @@
+package org.apache.helix;
+
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.participant.StateMachineEngine;
+
+/**
+ * Helix participant
+ */
+public interface HelixParticipant extends HelixRole, HelixService, HelixConnectionStateListener {
+ /**
+ * get participant id
+ * @return participant id
+ */
+ ParticipantId getParticipantId();
+
+ /**
+ * get state machine engine
+ * @return state machine engine
+ */
+ StateMachineEngine getStateMachineEngine();
+
+ /**
+ * add pre-connect callback
+ * @param callback
+ */
+ void addPreConnectCallback(PreConnectCallback callback);
+
+ /**
+ * Add a LiveInstanceInfoProvider that is invoked before creating liveInstance.</br>
+ * This allows applications to provide additional information that will be published to zookeeper
+ * and become available for discovery</br>
+ * @see LiveInstanceInfoProvider#getAdditionalLiveInstanceInfo()
+ * @param liveInstanceInfoProvider
+ */
+ void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/HelixRole.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixRole.java b/helix-core/src/main/java/org/apache/helix/HelixRole.java
new file mode 100644
index 0000000..9e112d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixRole.java
@@ -0,0 +1,40 @@
+package org.apache.helix;
+
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.Id;
+
+/**
+ * helix-role i.e. participant, controller, auto-controller
+ */
+public interface HelixRole {
+ /**
+ * get the underlying connection
+ * @return helix-connection
+ */
+ HelixConnection getConnection();
+
+ /**
+ * get cluster id to which this role belongs
+ * @return cluster id
+ */
+ ClusterId getClusterId();
+
+ /**
+ * get id of this helix-role
+ * @return id
+ */
+ Id getId();
+
+ /**
+ * helix-role type
+ * @return
+ */
+ InstanceType getType();
+
+ /**
+ * get the messaging-service
+ * @return messaging-service
+ */
+ ClusterMessagingService getMessagingService();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/HelixService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixService.java b/helix-core/src/main/java/org/apache/helix/HelixService.java
new file mode 100644
index 0000000..33cc8e5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixService.java
@@ -0,0 +1,16 @@
+package org.apache.helix;
+
+/**
+ * Operational methods of a helix role
+ */
+public interface HelixService {
+ /**
+ * start helix service async
+ */
+ void startAsync();
+
+ /**
+ * stop helix service async
+ */
+ void stopAsync();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 85b8432..80977ab 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -69,10 +69,10 @@ import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfiguration;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
-import org.testng.internal.annotations.Sets;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
public class ClusterAccessor {
private static Logger LOG = Logger.getLogger(ClusterAccessor.class);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
new file mode 100644
index 0000000..b58e4b2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
@@ -0,0 +1,296 @@
+package org.apache.helix.manager.zk;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixAutoController;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.HelixRole;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceConfigChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.LiveInstanceInfoProvider;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ScopedConfigChangeListener;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.Id;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.log4j.Logger;
+
+/**
+ * Adapt helix-connection to helix-manager, so we can pass to callback-handler and
+ * notification-context
+ */
+public class HelixConnectionAdaptor implements HelixManager {
+ private static Logger LOG = Logger.getLogger(HelixConnectionAdaptor.class);
+
+ final HelixRole _role;
+ final HelixConnection _connection;
+ final ClusterId _clusterId;
+ final Id _instanceId;
+ final InstanceType _instanceType;
+ final HelixDataAccessor _accessor;
+ final ClusterMessagingService _messagingService;
+ final SessionId _sessionId;
+
+ public HelixConnectionAdaptor(HelixRole role) {
+ _role = role;
+ _connection = role.getConnection();
+ _sessionId = _connection.getSessionId();
+ _clusterId = role.getClusterId();
+ _accessor = _connection.createDataAccessor(_clusterId);
+
+ _instanceId = role.getId();
+ _instanceType = role.getType();
+ _messagingService = role.getMessagingService();
+ }
+
+ @Override
+ public void connect() throws Exception {
+ _connection.connect();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return _connection.isConnected();
+ }
+
+ @Override
+ public void disconnect() {
+ _connection.disconnect();
+ }
+
+ @Override
+ public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception {
+ _connection.addIdealStateChangeListener(_role, listener, _clusterId);
+ }
+
+ @Override
+ public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception {
+ _connection.addLiveInstanceChangeListener(_role, listener, _clusterId);
+ }
+
+ @Override
+ public void addConfigChangeListener(ConfigChangeListener listener) throws Exception {
+ _connection.addConfigChangeListener(_role, listener, _clusterId);
+ }
+
+ @Override
+ public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
+ throws Exception {
+ _connection.addInstanceConfigChangeListener(_role, listener, _clusterId);
+ }
+
+ @Override
+ public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
+ throws Exception {
+ _connection.addConfigChangeListener(_role, listener, _clusterId, scope);
+ }
+
+ @Override
+ public void addMessageListener(MessageListener listener, String instanceName) throws Exception {
+ _connection.addMessageListener(_role, listener, _clusterId, ParticipantId.from(instanceName));
+ }
+
+ @Override
+ public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
+ String instanceName, String sessionId) throws Exception {
+ _connection.addCurrentStateChangeListener(_role, listener, _clusterId,
+ ParticipantId.from(instanceName), SessionId.from(sessionId));
+ }
+
+ @Override
+ public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
+ throws Exception {
+ _connection.addHealthStateChangeListener(_role, listener, _clusterId,
+ ParticipantId.from(instanceName));
+ }
+
+ @Override
+ public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception {
+ _connection.addExternalViewChangeListener(_role, listener, _clusterId);
+ }
+
+ @Override
+ public void addControllerListener(ControllerChangeListener listener) {
+ _connection.addControllerListener(_role, listener, _clusterId);
+ }
+
+ @Override
+ public boolean removeListener(PropertyKey key, Object listener) {
+ return _connection.removeListener(_role, listener, key);
+ }
+
+ @Override
+ public HelixDataAccessor getHelixDataAccessor() {
+ return _accessor;
+ }
+
+ @Override
+ public ConfigAccessor getConfigAccessor() {
+ return _connection.getConfigAccessor();
+ }
+
+ @Override
+ public String getClusterName() {
+ return _clusterId.stringify();
+ }
+
+ @Override
+ public String getInstanceName() {
+ return _instanceId.stringify();
+ }
+
+ @Override
+ public String getSessionId() {
+ return _sessionId.stringify();
+ }
+
+ @Override
+ public long getLastNotificationTime() {
+ return 0;
+ }
+
+ @Override
+ public HelixAdmin getClusterManagmentTool() {
+ return _connection.createClusterManagmentTool();
+ }
+
+ @Override
+ public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
+ return (ZkHelixPropertyStore<ZNRecord>) _connection.createPropertyStore(_clusterId);
+ }
+
+ @Override
+ public ClusterMessagingService getMessagingService() {
+ return _messagingService;
+ }
+
+ @Override
+ public ParticipantHealthReportCollector getHealthReportCollector() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public InstanceType getInstanceType() {
+ return _instanceType;
+ }
+
+ @Override
+ public String getVersion() {
+ return _connection.getHelixVersion();
+ }
+
+ @Override
+ public HelixManagerProperties getProperties() {
+ return _connection.getHelixProperties();
+ }
+
+ @Override
+ public StateMachineEngine getStateMachineEngine() {
+ StateMachineEngine engine = null;
+ switch (_role.getType()) {
+ case PARTICIPANT:
+ HelixParticipant participant = (HelixParticipant) _role;
+ engine = participant.getStateMachineEngine();
+ break;
+ case CONTROLLER_PARTICIPANT:
+ HelixAutoController autoController = (HelixAutoController) _role;
+ engine = autoController.getStateMachineEngine();
+ break;
+ default:
+ LOG.info("helix manager type: " + _role.getType()
+ + " does NOT have state-machine-engine");
+ break;
+ }
+
+ return engine;
+ }
+
+ @Override
+ public boolean isLeader() {
+ boolean isLeader = false;
+ switch (_role.getType()) {
+ case CONTROLLER:
+ HelixController controller = (HelixController) _role;
+ isLeader = controller.isLeader();
+ break;
+ case CONTROLLER_PARTICIPANT:
+ HelixAutoController autoController = (HelixAutoController) _role;
+ isLeader = autoController.isLeader();
+ break;
+ default:
+ LOG.info("helix manager type: " + _role.getType() + " does NOT support leadership");
+ break;
+ }
+ return isLeader;
+ }
+
+ @Override
+ public void startTimerTasks() {
+ throw new UnsupportedOperationException(
+ "HelixConnectionAdaptor does NOT support start timer tasks");
+ }
+
+ @Override
+ public void stopTimerTasks() {
+ throw new UnsupportedOperationException(
+ "HelixConnectionAdaptor does NOT support stop timer tasks");
+ }
+
+ @Override
+ public void addPreConnectCallback(PreConnectCallback callback) {
+ switch (_role.getType()) {
+ case PARTICIPANT:
+ HelixParticipant participant = (HelixParticipant) _role;
+ participant.addPreConnectCallback(callback);
+ break;
+ case CONTROLLER_PARTICIPANT:
+ HelixAutoController autoController = (HelixAutoController) _role;
+ autoController.addPreConnectCallback(callback);
+ break;
+ default:
+ LOG.info("helix manager type: " + _role.getType()
+ + " does NOT support add pre-connect callback");
+ break;
+ }
+ }
+
+ @Override
+ public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
+ switch (_role.getType()) {
+ case PARTICIPANT:
+ HelixParticipant participant = (HelixParticipant) _role;
+ participant.setLiveInstanceInfoProvider(liveInstanceInfoProvider);
+ break;
+ case CONTROLLER_PARTICIPANT:
+ HelixAutoController autoController = (HelixAutoController) _role;
+ autoController.setLiveInstanceInfoProvider(liveInstanceInfoProvider);
+ break;
+ default:
+ LOG.info("helix manager type: " + _role.getType()
+ + " does NOT support set additional live instance information");
+ break;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 0b112cd..4c7798f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -710,13 +710,6 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
success[i] = (results.get(i)._retCode == RetCode.OK);
}
- for (int i = 0; i < paths.size(); i++) {
- String path = paths.get(i);
- T record = records.get(i);
- if (path.indexOf("EXTERNALVIEW") != -1) {
- System.out.println("path: " + path + ", record: " + record + ", success: " + success[i]);
- }
- }
return success;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
new file mode 100644
index 0000000..d9ea445
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
@@ -0,0 +1,114 @@
+package org.apache.helix.manager.zk;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.HelixAutoController;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceInfoProvider;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.Id;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+public class ZkHelixAutoController implements HelixAutoController {
+ private static Logger LOG = Logger.getLogger(ZkHelixAutoController.class);
+
+ final ZkHelixConnection _connection;
+ final ClusterId _clusterId;
+ final ControllerId _controllerId;
+ final ZkHelixParticipant _participant;
+ final ZkHelixController _controller;
+
+ public ZkHelixAutoController(ZkHelixConnection connection, ClusterId clusterId,
+ ControllerId controllerId) {
+ _connection = connection;
+ _clusterId = clusterId;
+ _controllerId = controllerId;
+
+ _participant =
+ new ZkHelixParticipant(connection, clusterId, ParticipantId.from(controllerId.stringify()));
+ _controller = new ZkHelixController(connection, clusterId, controllerId);
+ }
+
+ @Override
+ public HelixConnection getConnection() {
+ return _connection;
+ }
+
+ @Override
+ public ClusterId getClusterId() {
+ return _clusterId;
+ }
+
+ @Override
+ public Id getId() {
+ return getControllerId();
+ }
+
+ @Override
+ public InstanceType getType() {
+ return InstanceType.CONTROLLER_PARTICIPANT;
+ }
+
+ @Override
+ public ClusterMessagingService getMessagingService() {
+ return _participant.getMessagingService();
+ }
+
+ @Override
+ public void startAsync() {
+ _connection.addConnectionStateListener(this);
+ onConnected();
+ }
+
+ @Override
+ public void stopAsync() {
+ _connection.removeConnectionStateListener(this);
+ onDisconnecting();
+ }
+
+ @Override
+ public void onConnected() {
+ _controller.reset();
+ _participant.reset();
+
+ _participant.init();
+ _controller.init();
+ }
+
+ @Override
+ public void onDisconnecting() {
+ LOG.info("disconnecting " + _controllerId + "(" + getType() + ") from " + _clusterId);
+ _controller.onDisconnecting();
+ _participant.onDisconnecting();
+ }
+
+ @Override
+ public ControllerId getControllerId() {
+ return _controllerId;
+ }
+
+ @Override
+ public StateMachineEngine getStateMachineEngine() {
+ return _participant.getStateMachineEngine();
+ }
+
+ @Override
+ public void addPreConnectCallback(PreConnectCallback callback) {
+ _participant.addPreConnectCallback(callback);
+ }
+
+ @Override
+ public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
+ _participant.setLiveInstanceInfoProvider(liveInstanceInfoProvider);
+ }
+
+ @Override
+ public boolean isLeader() {
+ return _controller.isLeader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
new file mode 100644
index 0000000..0717d77
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
@@ -0,0 +1,605 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixConnectionStateListener;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.HelixRole;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceConfigChangeListener;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ScopedConfigChangeListener;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.accessor.ParticipantAccessor;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class ZkHelixConnection implements HelixConnection, IZkStateListener {
+ private static Logger LOG = Logger.getLogger(ZkHelixConnection.class);
+
+ final String _zkAddr;
+ final int _sessionTimeout;
+ SessionId _sessionId;
+ ZkClient _zkclient;
+ BaseDataAccessor<ZNRecord> _baseAccessor;
+ ConfigAccessor _configAccessor;
+ final Set<HelixConnectionStateListener> _connectionListener =
+ new CopyOnWriteArraySet<HelixConnectionStateListener>();
+
+ final Map<HelixRole, List<CallbackHandler>> _handlers;
+ final HelixManagerProperties _properties;
+
+ /**
+ * Keep track of timestamps that zk State has become Disconnected
+ * If in a _timeWindowLengthMs window zk State has become Disconnected
+ * for more than_maxDisconnectThreshold times disconnect the zkHelixManager
+ */
+ final List<Long> _disconnectTimeHistory = new ArrayList<Long>();
+ final int _flappingTimeWindowMs;
+ final int _maxDisconnectThreshold;
+
+ final ReentrantLock _lock = new ReentrantLock();
+
+ /**
+ * helix version#
+ */
+ final String _version;
+
+ public ZkHelixConnection(String zkAddr) {
+ _zkAddr = zkAddr;
+ _handlers = new HashMap<HelixRole, List<CallbackHandler>>();
+
+ /**
+ * use system property if available
+ */
+ _flappingTimeWindowMs =
+ getSystemPropertyAsInt("helixmanager.flappingTimeWindow",
+ ZKHelixManager.FLAPPING_TIME_WINDIOW);
+
+ _maxDisconnectThreshold =
+ getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold",
+ ZKHelixManager.MAX_DISCONNECT_THRESHOLD);
+
+ _sessionTimeout =
+ getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
+
+ _properties = new HelixManagerProperties("cluster-manager-version.properties");
+ _version = _properties.getVersion();
+
+ }
+
+ private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {
+ String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue);
+
+ try {
+ int value = Integer.parseInt(valueString);
+ if (value > 0) {
+ return value;
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString
+ + ", using default value: " + propertyDefaultValue);
+ }
+
+ return propertyDefaultValue;
+ }
+
+ @Override
+ public void connect() {
+ boolean isStarted = false;
+ try {
+ _lock.lock();
+ _zkclient =
+ new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ // waitUntilConnected();
+
+ _baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
+ _configAccessor = new ConfigAccessor(_zkclient);
+
+ _zkclient.subscribeStateChanges(this);
+ handleNewSession();
+
+ isStarted = true;
+ } catch (Exception e) {
+ LOG.error("Exception connect", e);
+ } finally {
+ _lock.unlock();
+ if (!isStarted) {
+ disconnect();
+ }
+ }
+ }
+
+ @Override
+ public void disconnect() {
+ if (_zkclient == null) {
+ return;
+ }
+
+ LOG.info("Disconnecting connection: " + this);
+
+ try {
+ _lock.lock();
+ for (final HelixConnectionStateListener listener : _connectionListener) {
+ try {
+
+ listener.onDisconnecting();
+ } catch (Exception e) {
+ LOG.error("Exception in calling disconnect on listener: " + listener, e);
+ }
+ }
+ _zkclient.close();
+ _zkclient = null;
+ LOG.info("Disconnected connection: " + this);
+ } catch (Exception e) {
+ LOG.error("Exception disconnect", e);
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ try {
+ _lock.lock();
+ return _zkclient != null;
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ @Override
+ public HelixParticipant createParticipant(ClusterId clusterId, ParticipantId participantId) {
+ return new ZkHelixParticipant(this, clusterId, participantId);
+ }
+
+ @Override
+ public HelixController createController(ClusterId clusterId, ControllerId controllerId) {
+ return new ZkHelixController(this, clusterId, controllerId);
+ }
+
+ @Override
+ public ClusterAccessor createClusterAccessor(ClusterId clusterId) {
+ return new ClusterAccessor(clusterId, createDataAccessor(clusterId));
+ }
+
+ @Override
+ public ResourceAccessor createResourceAccessor(ClusterId clusterId) {
+ return new ResourceAccessor(createDataAccessor(clusterId));
+ }
+
+ @Override
+ public ParticipantAccessor createParticipantAccessor(ClusterId clusterId) {
+ return new ParticipantAccessor(createDataAccessor(clusterId));
+ }
+
+ @Override
+ public HelixAdmin createClusterManagmentTool() {
+ return new ZKHelixAdmin(_zkclient);
+ }
+
+ @Override
+ public HelixPropertyStore<ZNRecord> createPropertyStore(ClusterId clusterId) {
+ PropertyKey key = new PropertyKey.Builder(clusterId.stringify()).propertyStore();
+ return new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient),
+ key.getPath(), null);
+ }
+
+ @Override
+ public HelixDataAccessor createDataAccessor(ClusterId clusterId) {
+ return new ZKHelixDataAccessor(clusterId.stringify(), _baseAccessor);
+ }
+
+ @Override
+ public ConfigAccessor getConfigAccessor() {
+ return _configAccessor;
+ }
+
+ @Override
+ public void addControllerListener(HelixRole role, ControllerChangeListener listener,
+ ClusterId clusterId) {
+
+ addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).controller(),
+ ChangeType.CONTROLLER, new EventType[] {
+ EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+ });
+ }
+
+ @Override
+ public void addMessageListener(HelixRole role, MessageListener listener, ClusterId clusterId,
+ ParticipantId participantId) {
+
+ addListener(role, listener,
+ new PropertyKey.Builder(clusterId.stringify()).messages(participantId.stringify()),
+ ChangeType.MESSAGE, new EventType[] {
+ EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+ });
+ }
+
+ @Override
+ public void addControllerMessageListener(HelixRole role, MessageListener listener,
+ ClusterId clusterId) {
+
+ addListener(role, listener,
+ new PropertyKey.Builder(clusterId.stringify()).controllerMessages(),
+ ChangeType.MESSAGES_CONTROLLER, new EventType[] {
+ EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+ });
+ }
+
+ @Override
+ public void addIdealStateChangeListener(HelixRole role, IdealStateChangeListener listener,
+ ClusterId clusterId) {
+
+ addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).idealStates(),
+ ChangeType.IDEAL_STATE, new EventType[] {
+ EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated
+ });
+ }
+
+ @Override
+ public void addLiveInstanceChangeListener(HelixRole role, LiveInstanceChangeListener listener,
+ ClusterId clusterId) {
+
+ addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).liveInstances(),
+ ChangeType.LIVE_INSTANCE, new EventType[] {
+ EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted,
+ EventType.NodeCreated
+ });
+ }
+
+ @Override
+ public void addConfigChangeListener(HelixRole role, ConfigChangeListener listener,
+ ClusterId clusterId) {
+
+ addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).instanceConfigs(),
+ ChangeType.INSTANCE_CONFIG, new EventType[] {
+ EventType.NodeChildrenChanged
+ });
+ }
+
+ @Override
+ public void addInstanceConfigChangeListener(HelixRole role,
+ InstanceConfigChangeListener listener, ClusterId clusterId) {
+ addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).instanceConfigs(),
+ ChangeType.INSTANCE_CONFIG, new EventType[] {
+ EventType.NodeChildrenChanged
+ });
+ }
+
+ @Override
+ public void addConfigChangeListener(HelixRole role, ScopedConfigChangeListener listener,
+ ClusterId clusterId, ConfigScopeProperty scope) {
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterId.stringify());
+
+ PropertyKey propertyKey = null;
+ switch (scope) {
+ case CLUSTER:
+ propertyKey = keyBuilder.clusterConfigs();
+ break;
+ case PARTICIPANT:
+ propertyKey = keyBuilder.instanceConfigs();
+ break;
+ case RESOURCE:
+ propertyKey = keyBuilder.resourceConfigs();
+ break;
+ default:
+ break;
+ }
+
+ if (propertyKey == null) {
+ LOG.error("Failed to add listener: " + listener + ", unrecognized config scope: " + scope);
+ return;
+ }
+
+ addListener(role, listener, propertyKey, ChangeType.CONFIG, new EventType[] {
+ EventType.NodeChildrenChanged
+ });
+ }
+
+ @Override
+ public void addCurrentStateChangeListener(HelixRole role, CurrentStateChangeListener listener,
+ ClusterId clusterId, ParticipantId participantId, SessionId sessionId) {
+
+ addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).currentStates(
+ participantId.stringify(), sessionId.stringify()), ChangeType.CURRENT_STATE,
+ new EventType[] {
+ EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+ });
+ }
+
+ @Override
+ public void addHealthStateChangeListener(HelixRole role, HealthStateChangeListener listener,
+ ClusterId clusterId, ParticipantId participantId) {
+ addListener(role, listener,
+ new PropertyKey.Builder(clusterId.stringify()).healthReports(participantId.stringify()),
+ ChangeType.HEALTH,
+ new EventType[] {
+ EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+ });
+ }
+
+ @Override
+ public void addExternalViewChangeListener(HelixRole role, ExternalViewChangeListener listener,
+ ClusterId clusterId) {
+ addListener(role, listener, new PropertyKey.Builder(clusterId.stringify()).externalViews(),
+ ChangeType.EXTERNAL_VIEW, new EventType[] {
+ EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+ });
+ }
+
+ @Override
+ public boolean removeListener(HelixRole role, Object listener, PropertyKey key) {
+ LOG.info("role: " + role + " removing listener: " + listener + " on path: " + key.getPath()
+ + " from connection: " + this);
+ List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
+ List<CallbackHandler> handlerList = _handlers.get(role);
+ if (handlerList == null) {
+ return true;
+ }
+
+ synchronized (this) {
+ for (CallbackHandler handler : handlerList) {
+ // compare property-key path and listener reference
+ if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) {
+ toRemove.add(handler);
+ }
+ }
+
+ handlerList.removeAll(toRemove);
+ if (handlerList.isEmpty()) {
+ _handlers.remove(role);
+ }
+ }
+
+ // handler.reset() may modify the handlers list, so do it outside the iteration
+ for (CallbackHandler handler : toRemove) {
+ handler.reset();
+ }
+
+ return true;
+ }
+
+ @Override
+ public void addConnectionStateListener(HelixConnectionStateListener listener) {
+ synchronized (_connectionListener) {
+ _connectionListener.add(listener);
+ }
+ }
+
+ @Override
+ public void removeConnectionStateListener(HelixConnectionStateListener listener) {
+ synchronized (_connectionListener) {
+ _connectionListener.remove(listener);
+ }
+ }
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ try {
+ _lock.lock();
+
+ switch (state) {
+ case SyncConnected:
+ ZkConnection zkConnection = (ZkConnection) _zkclient.getConnection();
+ LOG.info("KeeperState: " + state + ", zookeeper:" + zkConnection.getZookeeper());
+ break;
+ case Disconnected:
+ LOG.info("KeeperState:" + state + ", disconnectedSessionId: " + _sessionId);
+
+ /**
+ * Track the time stamp that the disconnected happens, then check history and see if
+ * we should disconnect the helix-manager
+ */
+ _disconnectTimeHistory.add(System.currentTimeMillis());
+ if (isFlapping()) {
+ LOG.error("helix-connection: " + this + ", sessionId: " + _sessionId
+ + " is flapping. diconnect it. " + " maxDisconnectThreshold: "
+ + _maxDisconnectThreshold + " disconnects in " + _flappingTimeWindowMs + "ms");
+ disconnect();
+ }
+ break;
+ case Expired:
+ LOG.info("KeeperState:" + state + ", expiredSessionId: " + _sessionId);
+ break;
+ default:
+ break;
+ }
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ @Override
+ public void handleNewSession() throws Exception {
+ waitUntilConnected();
+
+ try {
+ _lock.lock();
+
+ for (final HelixConnectionStateListener listener : _connectionListener) {
+ try {
+ listener.onConnected();
+ } catch (Exception e) {
+ LOG.error("Exception invoking connect on listener: " + listener, e);
+ }
+ }
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ @Override
+ public SessionId getSessionId() {
+ return _sessionId;
+ }
+
+ @Override
+ public String getHelixVersion() {
+ return _version;
+ }
+
+ @Override
+ public HelixManagerProperties getHelixProperties() {
+ return _properties;
+ }
+
+ /**
+ * wait until we get a non-zero session-id. note that we might lose zkconnection
+ * right after we read session-id. but it's ok to get stale session-id and we will have
+ * another handle-new-session callback to correct this.
+ */
+ private void waitUntilConnected() {
+ boolean isConnected;
+ do {
+ isConnected =
+ _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!isConnected) {
+ LOG.error("fail to connect zkserver: " + _zkAddr + " in "
+ + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId);
+ continue;
+ }
+
+ ZkConnection zkConnection = ((ZkConnection) _zkclient.getConnection());
+ _sessionId = SessionId.from(Long.toHexString(zkConnection.getZookeeper().getSessionId()));
+
+ /**
+ * at the time we read session-id, zkconnection might be lost again
+ * wait until we get a non-zero session-id
+ */
+ } while ("0".equals(_sessionId));
+
+ LOG.info("Handling new session, session id: " + _sessionId + ", zkconnection: "
+ + ((ZkConnection) _zkclient.getConnection()).getZookeeper());
+ }
+
+ @Override
+ public int getSessionTimeout() {
+ return _sessionTimeout;
+ }
+
+ @Override
+ public ClusterMessagingService createMessagingService(HelixRole role) {
+ HelixManager manager = new HelixConnectionAdaptor(role);
+ return new DefaultMessagingService(manager);
+ }
+
+ void addListener(HelixRole role, Object listener, PropertyKey propertyKey, ChangeType changeType,
+ EventType[] eventType) {
+ // checkConnected();
+ HelixManager manager = new HelixConnectionAdaptor(role);
+ PropertyType type = propertyKey.getType();
+
+ synchronized (this) {
+ if (!_handlers.containsKey(role)) {
+ _handlers.put(role, new CopyOnWriteArrayList<CallbackHandler>());
+ }
+ List<CallbackHandler> handlerList = _handlers.get(role);
+
+ for (CallbackHandler handler : handlerList) {
+ // compare property-key path and listener reference
+ if (handler.getPath().equals(propertyKey.getPath())
+ && handler.getListener().equals(listener)) {
+ LOG.info("role: " + role + ", listener: " + listener + " on path: "
+ + propertyKey.getPath() + " already exists. skip add");
+
+ return;
+ }
+ }
+
+ CallbackHandler newHandler =
+ new CallbackHandler(manager, _zkclient, propertyKey, listener, eventType, changeType);
+
+ handlerList.add(newHandler);
+ LOG.info("role: " + role + " added listener: " + listener + " for type: " + type
+ + " to path: " + newHandler.getPath());
+ }
+ }
+
+ void initHandlers(HelixRole role) {
+ synchronized (this) {
+ List<CallbackHandler> handlerList = _handlers.get(role);
+
+ if (handlerList != null) {
+ for (CallbackHandler handler : handlerList) {
+ handler.init();
+ LOG.info("role: " + role + ", init handler: " + handler.getPath() + ", "
+ + handler.getListener());
+ }
+ }
+ }
+ }
+
+ void resetHandlers(HelixRole role) {
+ synchronized (this) {
+ List<CallbackHandler> handlerList = _handlers.get(role);
+
+ if (handlerList != null) {
+ for (CallbackHandler handler : handlerList) {
+ handler.reset();
+ LOG.info("role: " + role + ", reset handler: " + handler.getPath() + ", "
+ + handler.getListener());
+ }
+ }
+ }
+ }
+
+ /**
+ * If zk state has changed into DISCONNECTED for _maxDisconnectThreshold times during
+ * _timeWindowLengthMs time window, it's flapping and we tear down the zk-connection
+ */
+ private boolean isFlapping() {
+ if (_disconnectTimeHistory.size() == 0) {
+ return false;
+ }
+ long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
+
+ // Remove disconnect history timestamp that are older than _flappingTimeWindowMs ago
+ while ((_disconnectTimeHistory.get(0) + _flappingTimeWindowMs) < mostRecentTimestamp) {
+ _disconnectTimeHistory.remove(0);
+ }
+ return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
new file mode 100644
index 0000000..b0c5a8b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -0,0 +1,236 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.Id;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.HealthStatsAggregator;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.monitoring.StatusDumpTask;
+import org.apache.log4j.Logger;
+
+public class ZkHelixController implements HelixController {
+ private static Logger LOG = Logger.getLogger(ZkHelixController.class);
+
+ final ZkHelixConnection _connection;
+ final ClusterId _clusterId;
+ final ControllerId _controllerId;
+ final GenericHelixController _pipeline;
+ final DefaultMessagingService _messagingService;
+ final List<HelixTimerTask> _timerTasks;
+ final ClusterAccessor _clusterAccessor;
+ final HelixDataAccessor _accessor;
+ final HelixManager _manager;
+ final ZkHelixLeaderElection _leaderElection;
+
+ public ZkHelixController(ZkHelixConnection connection, ClusterId clusterId, ControllerId controllerId) {
+ _connection = connection;
+ _clusterId = clusterId;
+ _controllerId = controllerId;
+ _pipeline = new GenericHelixController();
+ _clusterAccessor = connection.createClusterAccessor(clusterId);
+ _accessor = connection.createDataAccessor(clusterId);
+
+ _messagingService = (DefaultMessagingService) connection.createMessagingService(this);
+ _timerTasks = new ArrayList<HelixTimerTask>();
+
+ _manager = new HelixConnectionAdaptor(this);
+ _leaderElection = new ZkHelixLeaderElection(this, _pipeline);
+
+ _timerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(_manager)));
+ _timerTasks.add(new StatusDumpTask(clusterId, _manager.getHelixDataAccessor()));
+ }
+
+ void startTimerTasks() {
+ for (HelixTimerTask task : _timerTasks) {
+ task.start();
+ }
+ }
+
+ void stopTimerTasks() {
+ for (HelixTimerTask task : _timerTasks) {
+ task.stop();
+ }
+ }
+
+ @Override
+ public HelixConnection getConnection() {
+ return _connection;
+ }
+
+ @Override
+ public void startAsync() {
+ _connection.addConnectionStateListener(this);
+ onConnected();
+ }
+
+ @Override
+ public void stopAsync() {
+ _connection.removeConnectionStateListener(this);
+ onDisconnecting();
+ }
+
+ void reset() {
+ /**
+ * reset all handlers, make sure cleanup completed for previous session
+ * disconnect if fail to cleanup
+ */
+ _connection.resetHandlers(this);
+
+ }
+
+ void init() {
+ /**
+ * from here on, we are dealing with new session
+ * init handlers
+ */
+ if (!_clusterAccessor.isClusterStructureValid()) {
+ throw new HelixException("Cluster structure is not set up for cluster: " + _clusterId);
+ }
+
+ /**
+ * leader-election listener should be reset/init before all other controller listeners;
+ * it's ok to add a listener multiple times, since we check existence in
+ * ZkHelixConnection#addXXXListner()
+ */
+ _connection.addControllerListener(this, _leaderElection, _clusterId);
+
+ /**
+ * ok to init message handler and controller handlers twice
+ * the second init will be skipped (see CallbackHandler)
+ */
+ _connection.initHandlers(this);
+ }
+
+ @Override
+ public void onConnected() {
+ reset();
+ init();
+ }
+
+ @Override
+ public void onDisconnecting() {
+ LOG.info("disconnecting " + _controllerId + "(" + getType() + ") from " + _clusterId);
+
+ reset();
+ }
+
+ @Override
+ public ClusterMessagingService getMessagingService() {
+ return _messagingService;
+ }
+
+ @Override
+ public ClusterId getClusterId() {
+ return _clusterId;
+ }
+
+ @Override
+ public ControllerId getControllerId() {
+ return _controllerId;
+ }
+
+ @Override
+ public Id getId() {
+ return getControllerId();
+ }
+
+ @Override
+ public InstanceType getType() {
+ return InstanceType.CONTROLLER;
+ }
+
+ @Override
+ public boolean isLeader() {
+ PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+ try {
+ LiveInstance leader = _accessor.getProperty(keyBuilder.controllerLeader());
+ if (leader != null) {
+ String leaderName = leader.getInstanceName();
+ String sessionId = leader.getSessionId();
+ if (leaderName != null && leaderName.equals(_controllerId.stringify()) && sessionId != null
+ && sessionId.equals(_connection.getSessionId().stringify())) {
+ return true;
+ }
+ }
+ } catch (Exception e) {
+ // log
+ }
+ return false;
+ }
+
+ void addListenersToController(GenericHelixController pipeline) {
+ try {
+ /**
+ * setup controller message listener and register message handlers
+ */
+ _connection.addControllerMessageListener(this, _messagingService.getExecutor(),
+ _clusterId);
+ MessageHandlerFactory defaultControllerMsgHandlerFactory =
+ new DefaultControllerMessageHandlerFactory();
+ _messagingService.getExecutor().registerMessageHandlerFactory(
+ defaultControllerMsgHandlerFactory.getMessageType(), defaultControllerMsgHandlerFactory);
+ MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
+ new DefaultSchedulerMessageHandlerFactory(_manager);
+ _messagingService.getExecutor().registerMessageHandlerFactory(
+ defaultSchedulerMsgHandlerFactory.getMessageType(), defaultSchedulerMsgHandlerFactory);
+ MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
+ new DefaultParticipantErrorMessageHandlerFactory(_manager);
+ _messagingService.getExecutor().registerMessageHandlerFactory(
+ defaultParticipantErrorMessageHandlerFactory.getMessageType(),
+ defaultParticipantErrorMessageHandlerFactory);
+
+ /**
+ * setup generic-controller
+ */
+ _connection.addConfigChangeListener(this, pipeline, _clusterId);
+ _connection.addLiveInstanceChangeListener(this, pipeline, _clusterId);
+ _connection.addIdealStateChangeListener(this, pipeline, _clusterId);
+ _connection.addControllerListener(this, pipeline, _clusterId);
+ } catch (ZkInterruptedException e) {
+ LOG.warn("zk connection is interrupted during addListenersToController()"
+ + e);
+ } catch (Exception e) {
+ LOG.error("Error addListenersToController", e);
+ }
+ }
+
+ void removeListenersFromController(GenericHelixController pipeline) {
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(_manager.getClusterName());
+ /**
+ * reset generic-controller
+ */
+ _connection.removeListener(this, pipeline, keyBuilder.instanceConfigs());
+ _connection.removeListener(this, pipeline, keyBuilder.liveInstances());
+ _connection.removeListener(this, pipeline, keyBuilder.idealStates());
+ _connection.removeListener(this, pipeline, keyBuilder.controller());
+
+ /**
+ * reset controller message listener and unregister all message handlers
+ */
+ _connection.removeListener(this, _messagingService.getExecutor(),
+ keyBuilder.controllerMessages());
+ }
+
+ HelixManager getManager() {
+ return _manager;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
new file mode 100644
index 0000000..06e8cd8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
@@ -0,0 +1,148 @@
+package org.apache.helix.manager.zk;
+
+import java.lang.management.ManagementFactory;
+
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.model.LeaderHistory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.log4j.Logger;
+
+// TODO GenericHelixController has a controller-listener, we can invoke leader-election from there
+public class ZkHelixLeaderElection implements ControllerChangeListener {
+ private static Logger LOG = Logger.getLogger(ZkHelixLeaderElection.class);
+
+ final ZkHelixController _controller;
+ final ClusterId _clusterId;
+ final ControllerId _controllerId;
+ final HelixManager _manager;
+ final GenericHelixController _pipeline;
+
+ public ZkHelixLeaderElection(ZkHelixController controller, GenericHelixController pipeline) {
+ _controller = controller;
+ _clusterId = controller.getClusterId();
+ _controllerId = controller.getControllerId();
+ _pipeline = pipeline;
+ _manager = controller.getManager();
+ }
+
+ /**
+ * may be accessed by multiple threads: zk-client thread and
+ * ZkHelixManager.disconnect()->reset() TODO: Refactor accessing
+ * HelixMangerMain class statically
+ */
+ @Override
+ public synchronized void onControllerChange(NotificationContext changeContext) {
+ HelixManager manager = changeContext.getManager();
+ if (manager == null) {
+ LOG.error("missing attributes in changeContext. requires HelixManager");
+ return;
+ }
+
+ InstanceType type = _manager.getInstanceType();
+ if (type != InstanceType.CONTROLLER && type != InstanceType.CONTROLLER_PARTICIPANT) {
+ LOG.error("fail to become controller because incorrect instanceType (was " + type.toString()
+ + ", requires CONTROLLER | CONTROLLER_PARTICIPANT)");
+ return;
+ }
+
+ try {
+ if (changeContext.getType().equals(NotificationContext.Type.INIT)
+ || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
+ LOG.info(_controllerId + " is trying to acquire leadership for cluster: " + _clusterId);
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
+ boolean success = tryUpdateController(_manager);
+ if (success) {
+ LOG.info(_controllerId + " acquires leadership of cluster: " + _clusterId);
+
+ updateHistory(_manager);
+ _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
+ _controller.addListenersToController(_pipeline);
+ _controller.startTimerTasks();
+ }
+ }
+ } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
+ LOG.info(_controllerId + " reqlinquishes leadership of cluster: " + _clusterId);
+ _controller.stopTimerTasks();
+ _controller.removeListenersFromController(_pipeline);
+
+ /**
+ * clear write-through cache
+ */
+ _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
+ }
+
+ } catch (Exception e) {
+ LOG.error("Exception when trying to become leader", e);
+ }
+ }
+
+ private boolean tryUpdateController(HelixManager manager) {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ LiveInstance leader = new LiveInstance(manager.getInstanceName());
+ try {
+ leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+ leader.setSessionId(manager.getSessionId());
+ leader.setHelixVersion(manager.getVersion());
+ if (ZKPropertyTransferServer.getInstance() != null) {
+ String zkPropertyTransferServiceUrl =
+ ZKPropertyTransferServer.getInstance().getWebserviceUrl();
+ if (zkPropertyTransferServiceUrl != null) {
+ leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
+ }
+ } else {
+ LOG.warn("ZKPropertyTransferServer instnace is null");
+ }
+ boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
+ if (success) {
+ return true;
+ } else {
+ LOG.info("Unable to become leader probably because some other controller becames the leader");
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Exception when trying to updating leader record in cluster:" + manager.getClusterName()
+ + ". Need to check again whether leader node has been created or not", e);
+ }
+
+ leader = accessor.getProperty(keyBuilder.controllerLeader());
+ if (leader != null) {
+ String leaderSessionId = leader.getSessionId();
+ LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
+ + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
+
+ if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void updateHistory(HelixManager manager) {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
+ if (history == null) {
+ history = new LeaderHistory(PropertyType.HISTORY.toString());
+ }
+ history.updateHistory(manager.getClusterName(), manager.getInstanceName());
+ accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c589fb8d/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
new file mode 100644
index 0000000..eba96c9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
@@ -0,0 +1,475 @@
+package org.apache.helix.manager.zk;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceInfoProvider;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.accessor.ParticipantAccessor;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.Id;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.healthcheck.ParticipantHealthReportTask;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkHelixParticipant implements HelixParticipant {
+ private static Logger LOG = Logger.getLogger(ZkHelixParticipant.class);
+
+ final ZkHelixConnection _connection;
+ final ClusterId _clusterId;
+ final ParticipantId _participantId;
+ final ZKHelixDataAccessor _accessor;
+ final BaseDataAccessor<ZNRecord> _baseAccessor;
+ final PropertyKey.Builder _keyBuilder;
+ final ConfigAccessor _configAccessor;
+ final ClusterAccessor _clusterAccessor;
+ final ParticipantAccessor _participantAccessor;
+ final DefaultMessagingService _messagingService;
+ final List<PreConnectCallback> _preConnectCallbacks;
+ final List<HelixTimerTask> _timerTasks;
+ final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
+
+ /**
+ * state-transition message handler factory for helix-participant
+ */
+ final StateMachineEngine _stateMachineEngine;
+
+ LiveInstanceInfoProvider _liveInstanceInfoProvider;
+
+ public ZkHelixParticipant(ZkHelixConnection connection, ClusterId clusterId,
+ ParticipantId participantId) {
+ _connection = connection;
+ _accessor = (ZKHelixDataAccessor) connection.createDataAccessor(clusterId);
+ _baseAccessor = _accessor.getBaseDataAccessor();
+ _keyBuilder = _accessor.keyBuilder();
+ _clusterAccessor = connection.createClusterAccessor(clusterId);
+ _participantAccessor = connection.createParticipantAccessor(clusterId);
+ _configAccessor = connection.getConfigAccessor();
+
+ _clusterId = clusterId;
+ _participantId = participantId;
+
+ _messagingService = (DefaultMessagingService) connection.createMessagingService(this);
+ HelixManager manager = new HelixConnectionAdaptor(this);
+ _stateMachineEngine = new HelixStateMachineEngine(manager);
+ _preConnectCallbacks = new ArrayList<PreConnectCallback>();
+ _timerTasks = new ArrayList<HelixTimerTask>();
+ _participantHealthInfoCollector =
+ new ParticipantHealthReportCollectorImpl(manager, participantId.stringify());
+
+ _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
+
+ }
+
+ @Override
+ public ClusterId getClusterId() {
+ return _clusterId;
+ }
+
+ @Override
+ public ParticipantId getParticipantId() {
+ return _participantId;
+ }
+
+ @Override
+ public HelixConnection getConnection() {
+ return _connection;
+ }
+
+ void startTimerTasks() {
+ for (HelixTimerTask task : _timerTasks) {
+ task.start();
+ }
+ }
+
+ void stopTimerTasks() {
+ for (HelixTimerTask task : _timerTasks) {
+ task.stop();
+ }
+ }
+
+ void reset() {
+ /**
+ * stop timer tasks, reset all handlers, make sure cleanup completed for previous session,
+ * disconnect if cleanup fails
+ */
+ stopTimerTasks();
+ _connection.resetHandlers(this);
+
+ /**
+ * clear write-through cache
+ */
+ _accessor.getBaseDataAccessor().reset();
+ }
+
+ private void createLiveInstance() {
+ String liveInstancePath = _keyBuilder.liveInstance(_participantId.stringify()).getPath();
+ String sessionId = _connection.getSessionId().stringify();
+ LiveInstance liveInstance = new LiveInstance(_participantId.stringify());
+ liveInstance.setSessionId(sessionId);
+ liveInstance.setHelixVersion(_connection.getHelixVersion());
+ liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+
+ if (_liveInstanceInfoProvider != null) {
+ LOG.info("Additional live instance information is provided: " + _liveInstanceInfoProvider);
+ ZNRecord additionalLiveInstanceInfo =
+ _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
+ if (additionalLiveInstanceInfo != null) {
+ additionalLiveInstanceInfo.merge(liveInstance.getRecord());
+ ZNRecord mergedLiveInstance =
+ new ZNRecord(additionalLiveInstanceInfo, _participantId.stringify());
+ liveInstance = new LiveInstance(mergedLiveInstance);
+ LOG.info("Participant: " + _participantId + ", mergedLiveInstance: " + liveInstance);
+ }
+ }
+
+ boolean retry;
+ do {
+ retry = false;
+ boolean success =
+ _baseAccessor.create(liveInstancePath, liveInstance.getRecord(), AccessOption.EPHEMERAL);
+ if (!success) {
+ LOG.warn("found another participant with same name: " + _participantId + " in cluster "
+ + _clusterId);
+
+ Stat stat = new Stat();
+ ZNRecord record = _baseAccessor.get(liveInstancePath, stat, 0);
+ if (record == null) {
+ /**
+ * live-instance is gone as we check it, retry create live-instance
+ */
+ retry = true;
+ } else {
+ String ephemeralOwner = Long.toHexString(stat.getEphemeralOwner());
+ if (ephemeralOwner.equals(sessionId)) {
+ /**
+ * update sessionId field in live-instance if necessary
+ */
+ LiveInstance curLiveInstance = new LiveInstance(record);
+ if (!curLiveInstance.getSessionId().equals(sessionId)) {
+ /**
+ * in last handle-new-session,
+ * live-instance is created by new zkconnection with stale session-id inside
+ * just update session-id field
+ */
+ LOG.info("overwriting session-id by ephemeralOwner: " + ephemeralOwner
+ + ", old-sessionId: " + curLiveInstance.getSessionId() + ", new-sessionId: "
+ + sessionId);
+
+ curLiveInstance.setSessionId(sessionId);
+ success =
+ _baseAccessor.set(liveInstancePath, curLiveInstance.getRecord(),
+ stat.getVersion(), AccessOption.EPHEMERAL);
+ if (!success) {
+ LOG.error("Someone changes sessionId as we update, should not happen");
+ throw new HelixException("fail to create live-instance for " + _participantId);
+ }
+ }
+ } else {
+ /**
+ * wait for a while, in case previous helix-participant exits unexpectedly
+ * and its live-instance still hangs around until session timeout
+ */
+ try {
+ TimeUnit.MILLISECONDS.sleep(_connection.getSessionTimeout() + 5000);
+ } catch (InterruptedException ex) {
+ LOG.warn("Sleep interrupted while waiting for previous live-instance to go away", ex);
+ }
+ /**
+ * give a last try after exit while loop
+ */
+ retry = true;
+ break;
+ }
+ }
+ }
+ } while (retry);
+
+ /**
+ * give a last shot
+ */
+ if (retry) {
+ boolean success =
+ _baseAccessor.create(liveInstancePath, liveInstance.getRecord(), AccessOption.EPHEMERAL);
+ if (!success) {
+ LOG.error("instance: " + _participantId + " already has a live-instance in cluster "
+ + _clusterId);
+ throw new HelixException("fail to create live-instance for " + _participantId);
+ }
+ }
+ }
+
+ /**
+ * carry over current-states from last sessions
+ * set to initial state for current session only when state doesn't exist in current session
+ */
+ private void carryOverPreviousCurrentState() {
+ String sessionId = _connection.getSessionId().stringify();
+ String participantName = _participantId.stringify();
+ List<String> sessions = _accessor.getChildNames(_keyBuilder.sessions(participantName));
+
+ for (String session : sessions) {
+ if (session.equals(sessionId)) {
+ continue;
+ }
+
+ List<CurrentState> lastCurStates =
+ _accessor.getChildValues(_keyBuilder.currentStates(participantName, session));
+
+ for (CurrentState lastCurState : lastCurStates) {
+ LOG.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId()
+ + " to current session: " + sessionId);
+ String stateModelDefRef = lastCurState.getStateModelDefRef();
+ if (stateModelDefRef == null) {
+ LOG.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
+ + lastCurState);
+ continue;
+ }
+ StateModelDefinition stateModel =
+ _accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef));
+
+ String curStatePath =
+ _keyBuilder.currentState(participantName, sessionId, lastCurState.getResourceName())
+ .getPath();
+ _accessor.getBaseDataAccessor().update(
+ curStatePath,
+ new CurStateCarryOverUpdater(sessionId, stateModel.getInitialState(), lastCurState),
+ AccessOption.PERSISTENT);
+ }
+ }
+
+ /**
+ * remove previous current state parent nodes
+ */
+ for (String session : sessions) {
+ if (session.equals(sessionId)) {
+ continue;
+ }
+
+ PropertyKey key = _keyBuilder.currentStates(participantName, session);
+ LOG.info("Removing current states from previous sessions. path: " + key.getPath());
+ _accessor.removeProperty(key);
+ }
+ }
+
+ /**
+ * Read cluster config and see if instance can auto join the cluster
+ */
+ private void joinCluster() {
+ boolean autoJoin = false;
+ try {
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
+ _clusterId.stringify()).build();
+ autoJoin =
+ Boolean
+ .parseBoolean(_configAccessor.get(scope, HelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
+ LOG.info("instance: " + _participantId + " auto-joining " + _clusterId + " is " + autoJoin);
+ } catch (Exception e) {
+ // autoJoin is false
+ }
+
+ if (!_participantAccessor.isParticipantStructureValid(_participantId)) {
+ if (!autoJoin) {
+ throw new HelixException("Initial cluster structure is not set up for instance: "
+ + _participantId + ", instanceType: " + getType());
+ } else {
+ LOG.info(_participantId + " is auto-joining cluster: " + _clusterId);
+ String participantName = _participantId.stringify();
+ String hostName = participantName;
+ int port = -1;
+ int lastPos = participantName.lastIndexOf("_");
+ if (lastPos > 0) {
+ hostName = participantName.substring(0, lastPos);
+ try {
+ port = Integer.parseInt(participantName.substring(lastPos + 1));
+ } catch (Exception e) {
+ // use port = -1
+ }
+ }
+ ParticipantConfig.Builder builder =
+ new ParticipantConfig.Builder(_participantId).hostName(hostName).port(port)
+ .enabled(true);
+ _clusterAccessor.addParticipantToCluster(builder.build());
+ }
+ }
+ }
+
+ private void setupMsgHandler() {
+ _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
+ _stateMachineEngine);
+
+ /**
+ * it's ok to add a listener multiple times, since we check existence in
+ * ZkHelixConnection#addXXXListner()
+ */
+ _connection.addMessageListener(this, _messagingService.getExecutor(), _clusterId,
+ _participantId);
+ _connection.addControllerListener(this, _accessor, _clusterId);
+
+ ScheduledTaskStateModelFactory stStateModelFactory =
+ new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
+ _stateMachineEngine.registerStateModelFactory(
+ DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
+ }
+
+ /**
+ * create zk path for health check info
+ * TODO move it to cluster-setup
+ */
+ private void createHealthCheckPath() {
+ PropertyKey healthCheckInfoKey = _keyBuilder.healthReports(_participantId.stringify());
+ if (_accessor.createProperty(healthCheckInfoKey, null)) {
+ LOG.info("Created healthcheck info path: " + healthCheckInfoKey.getPath());
+ }
+ }
+
+ void init() {
+ /**
+ * from here on, we are dealing with new session
+ */
+ if (!_clusterAccessor.isClusterStructureValid()) {
+ throw new HelixException("Cluster structure is not set up for cluster: " + _clusterId);
+ }
+
+ /**
+ * auto-join
+ */
+ joinCluster();
+
+ /**
+ * Invoke PreConnectCallbacks
+ */
+ for (PreConnectCallback callback : _preConnectCallbacks) {
+ callback.onPreConnect();
+ }
+
+ createLiveInstance();
+ carryOverPreviousCurrentState();
+
+ /**
+ * setup message listener
+ */
+ setupMsgHandler();
+
+ /**
+ * start health check timer task
+ */
+ createHealthCheckPath();
+ startTimerTasks();
+
+ /**
+ * init handlers
+ * ok to init message handler and data-accessor twice
+ * the second init will be skipped (see CallbackHandler)
+ */
+ _connection.initHandlers(this);
+ }
+
+ @Override
+ public void onConnected() {
+ reset();
+ init();
+ }
+
+ @Override
+ public void onDisconnecting() {
+ LOG.info("disconnecting " + _participantId + "(" + getType() + ") from " + _clusterId);
+
+ reset();
+
+ /**
+ * shall we shutdown thread pool first to avoid reset() being invoked in the middle of state
+ * transition?
+ */
+ _messagingService.getExecutor().shutdown();
+ _accessor.shutdown();
+
+ }
+
+ @Override
+ public void startAsync() {
+ _connection.addConnectionStateListener(this);
+ onConnected();
+ }
+
+ @Override
+ public void stopAsync() {
+ _connection.removeConnectionStateListener(this);
+ onDisconnecting();
+ }
+
+ @Override
+ public ClusterMessagingService getMessagingService() {
+ return _messagingService;
+ }
+
+ @Override
+ public StateMachineEngine getStateMachineEngine() {
+ return _stateMachineEngine;
+ }
+
+ @Override
+ public Id getId() {
+ return getParticipantId();
+ }
+
+ @Override
+ public InstanceType getType() {
+ return InstanceType.PARTICIPANT;
+ }
+
+ @Override
+ public void addPreConnectCallback(PreConnectCallback callback) {
+ LOG.info("Adding preconnect callback: " + callback);
+ _preConnectCallbacks.add(callback);
+ }
+
+ @Override
+ public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
+ _liveInstanceInfoProvider = liveInstanceInfoProvider;
+ }
+
+ public HelixDataAccessor getAccessor() {
+ return _accessor;
+ }
+
+ public ClusterAccessor getClusterAccessor() {
+ return _clusterAccessor;
+ }
+
+ public ParticipantAccessor getParticipantAccessor() {
+ return _participantAccessor;
+ }
+
+}