You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/10/25 03:21:20 UTC
[07/10] [HELIX-279] Apply gc handling fixes to main ZKHelixManager
class
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
deleted file mode 100644
index 4f549e4..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
+++ /dev/null
@@ -1,693 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.ConfigChangeListener;
-import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.CurrentStateChangeListener;
-import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HealthStateChangeListener;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.IdealStateChangeListener;
-import org.apache.helix.InstanceConfigChangeListener;
-import org.apache.helix.InstanceType;
-import org.apache.helix.LiveInstanceChangeListener;
-import org.apache.helix.LiveInstanceInfoProvider;
-import org.apache.helix.MessageListener;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ScopedConfigChangeListener;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
-import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper.States;
-
-public abstract class AbstractManager implements HelixManager, IZkStateListener {
- private static Logger LOG = Logger.getLogger(AbstractManager.class);
-
- final String _zkAddress;
- final String _clusterName;
- final String _instanceName;
- final InstanceType _instanceType;
- final int _sessionTimeout;
- final List<PreConnectCallback> _preConnectCallbacks;
- protected final List<CallbackHandler> _handlers;
- final HelixManagerProperties _properties;
-
- /**
- * helix version#
- */
- final String _version;
-
- protected ZkClient _zkclient = null;
- final DefaultMessagingService _messagingService;
-
- BaseDataAccessor<ZNRecord> _baseDataAccessor;
- ZKHelixDataAccessor _dataAccessor;
- final Builder _keyBuilder;
- ConfigAccessor _configAccessor;
- ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
- LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
- final List<HelixTimerTask> _timerTasks = new ArrayList<HelixTimerTask>();
-
- volatile String _sessionId;
-
- /**
- * Keep track of timestamps that zk State has become Disconnected
- * If in a _timeWindowLengthMs window zk State has become Disconnected
- * for more than_maxDisconnectThreshold times disconnect the zkHelixManager
- */
- final List<Long> _disconnectTimeHistory = new LinkedList<Long>();
-
- final int _flappingTimeWindowMs;
- final int _maxDisconnectThreshold;
-
- public AbstractManager(String zkAddress, String clusterName, String instanceName,
- InstanceType instanceType) {
-
- LOG.info("Create a zk-based cluster manager. zkSvr: " + zkAddress + ", clusterName: "
- + clusterName + ", instanceName: " + instanceName + ", type: " + instanceType);
-
- _zkAddress = zkAddress;
- _clusterName = clusterName;
- _instanceType = instanceType;
- _instanceName = instanceName;
- _preConnectCallbacks = new LinkedList<PreConnectCallback>();
- _handlers = new ArrayList<CallbackHandler>();
- _properties = new HelixManagerProperties("cluster-manager-version.properties");
- _version = _properties.getVersion();
-
- _keyBuilder = new Builder(clusterName);
- _messagingService = new DefaultMessagingService(this);
-
- /**
- * use system property if available
- */
- _flappingTimeWindowMs =
- getSystemPropertyAsInt("helixmanager.flappingTimeWindow",
- ZKHelixManager.FLAPPING_TIME_WINDIOW);
-
- _maxDisconnectThreshold =
- getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold",
- ZKHelixManager.MAX_DISCONNECT_THRESHOLD);
-
- _sessionTimeout =
- getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
-
- }
-
- private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {
- String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue);
-
- try {
- int value = Integer.parseInt(valueString);
- if (value > 0) {
- return value;
- }
- } catch (NumberFormatException e) {
- LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString
- + ", using default value: " + propertyDefaultValue);
- }
-
- return propertyDefaultValue;
- }
-
- /**
- * different types of helix manager should impl its own handle new session logic
- */
- // public abstract void handleNewSession();
-
- @Override
- public void connect() throws Exception {
- LOG.info("ClusterManager.connect()");
- if (isConnected()) {
- LOG.warn("Cluster manager: " + _instanceName + " for cluster: " + _clusterName
- + " already connected. skip connect");
- return;
- }
-
- try {
- createClient();
- _messagingService.onConnected();
- } catch (Exception e) {
- LOG.error("fail to connect " + _instanceName, e);
- disconnect();
- throw e;
- }
- }
-
- @Override
- public boolean isConnected() {
- if (_zkclient == null) {
- return false;
- }
- ZkConnection zkconnection = (ZkConnection) _zkclient.getConnection();
- if (zkconnection != null) {
- States state = zkconnection.getZookeeperState();
- return state == States.CONNECTED;
- }
- return false;
- }
-
- /**
- * specific disconnect logic for each helix-manager type
- */
- abstract void doDisconnect();
-
- /**
- * This function can be called when the connection are in bad state(e.g. flapping),
- * in which isConnected() could be false and we want to disconnect from cluster.
- */
- @Override
- public void disconnect() {
- LOG.info("disconnect " + _instanceName + "(" + _instanceType + ") from " + _clusterName);
-
- try {
- /**
- * stop all timer tasks
- */
- stopTimerTasks();
-
- /**
- * shutdown thread pool first to avoid reset() being invoked in the middle of state
- * transition
- */
- _messagingService.getExecutor().shutdown();
-
- // TODO reset user defined handlers only
- resetHandlers();
-
- _dataAccessor.shutdown();
-
- doDisconnect();
-
- _zkclient.unsubscribeAll();
- } finally {
- _zkclient.close();
- LOG.info("Cluster manager: " + _instanceName + " disconnected");
- }
- }
-
- @Override
- public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception {
- addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE,
- new EventType[] {
- EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- @Override
- public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception {
- addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE,
- new EventType[] {
- EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted,
- EventType.NodeCreated
- });
- }
-
- @Override
- public void addConfigChangeListener(ConfigChangeListener listener) throws Exception {
- addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
- new EventType[] {
- EventType.NodeChildrenChanged
- });
- }
-
- @Override
- public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
- throws Exception {
- addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
- new EventType[] {
- EventType.NodeChildrenChanged
- });
- }
-
- @Override
- public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
- throws Exception {
- Builder keyBuilder = new Builder(_clusterName);
-
- PropertyKey propertyKey = null;
- switch (scope) {
- case CLUSTER:
- propertyKey = keyBuilder.clusterConfigs();
- break;
- case PARTICIPANT:
- propertyKey = keyBuilder.instanceConfigs();
- break;
- case RESOURCE:
- propertyKey = keyBuilder.resourceConfigs();
- break;
- default:
- break;
- }
-
- if (propertyKey != null) {
- addListener(listener, propertyKey, ChangeType.CONFIG, new EventType[] {
- EventType.NodeChildrenChanged
- });
- } else {
- LOG.error("Can't add listener to config scope: " + scope);
- }
- }
-
- @Override
- public void addMessageListener(MessageListener listener, String instanceName) {
- addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE,
- new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- @Override
- public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
- String instanceName, String sessionId) throws Exception {
- addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId),
- ChangeType.CURRENT_STATE, new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- @Override
- public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
- throws Exception {
- addListener(listener, new Builder(_clusterName).healthReports(instanceName), ChangeType.HEALTH,
- new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- @Override
- public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception {
- addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW,
- new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- @Override
- public void addControllerListener(ControllerChangeListener listener) {
- addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER,
- new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- void addControllerMessageListener(MessageListener listener) {
- addListener(listener, new Builder(_clusterName).controllerMessages(),
- ChangeType.MESSAGES_CONTROLLER, new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- @Override
- public boolean removeListener(PropertyKey key, Object listener) {
- LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: "
- + _clusterName + " by instance: " + _instanceName);
-
- synchronized (this) {
- List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
- for (CallbackHandler handler : _handlers) {
- // compare property-key path and listener reference
- if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) {
- toRemove.add(handler);
- }
- }
-
- _handlers.removeAll(toRemove);
-
- // handler.reset() may modify the handlers list, so do it outside the iteration
- for (CallbackHandler handler : toRemove) {
- handler.reset();
- }
- }
-
- return true;
- }
-
- @Override
- public HelixDataAccessor getHelixDataAccessor() {
- checkConnected();
- return _dataAccessor;
- }
-
- @Override
- public ConfigAccessor getConfigAccessor() {
- checkConnected();
- return _configAccessor;
- }
-
- @Override
- public String getClusterName() {
- return _clusterName;
- }
-
- @Override
- public String getInstanceName() {
- return _instanceName;
- }
-
- @Override
- public String getSessionId() {
- checkConnected();
- return _sessionId;
- }
-
- @Override
- public long getLastNotificationTime() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public HelixAdmin getClusterManagmentTool() {
- checkConnected();
- if (_zkclient != null) {
- return new ZKHelixAdmin(_zkclient);
- }
-
- LOG.error("Couldn't get ZKClusterManagementTool because zkclient is null");
- return null;
- }
-
- @Override
- public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
- checkConnected();
-
- if (_helixPropertyStore == null) {
- String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
-
- _helixPropertyStore =
- new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient), path,
- null);
- }
-
- return _helixPropertyStore;
- }
-
- @Override
- public ClusterMessagingService getMessagingService() {
- // The caller can register message handler factories on messaging service before the
- // helix manager is connected. Thus we do not check connected here
- return _messagingService;
- }
-
- @Override
- public ParticipantHealthReportCollector getHealthReportCollector() {
- // helix-participant will override this
- return null;
- }
-
- @Override
- public InstanceType getInstanceType() {
- return _instanceType;
- }
-
- @Override
- public String getVersion() {
- return _version;
- }
-
- @Override
- public HelixManagerProperties getProperties() {
- return _properties;
- }
-
- @Override
- public StateMachineEngine getStateMachineEngine() {
- // helix-participant will override this
- return null;
- }
-
- @Override
- public abstract boolean isLeader();
-
- @Override
- public void startTimerTasks() {
- for (HelixTimerTask task : _timerTasks) {
- task.start();
- }
-
- }
-
- @Override
- public void stopTimerTasks() {
- for (HelixTimerTask task : _timerTasks) {
- task.stop();
- }
-
- }
-
- @Override
- public void addPreConnectCallback(PreConnectCallback callback) {
- LOG.info("Adding preconnect callback: " + callback);
- _preConnectCallbacks.add(callback);
- }
-
- @Override
- public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
- _liveInstanceInfoProvider = liveInstanceInfoProvider;
- }
-
- /**
- * wait until we get a non-zero session-id. note that we might lose zkconnection
- * right after we read session-id. but it's ok to get stale session-id and we will have
- * another handle-new-session callback to correct this.
- */
- protected void waitUntilConnected() {
- boolean isConnected;
- do {
- isConnected =
- _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
- if (!isConnected) {
- LOG.error("fail to connect zkserver: " + _zkAddress + " in "
- + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
- + ", clusterName: " + _clusterName);
- continue;
- }
-
- ZkConnection zkConnection = ((ZkConnection) _zkclient.getConnection());
- _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
-
- /**
- * at the time we read session-id, zkconnection might be lost again
- * wait until we get a non-zero session-id
- */
- } while ("0".equals(_sessionId));
-
- LOG.info("Handling new session, session id: " + _sessionId + ", instance: " + _instanceName
- + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName + ", zkconnection: "
- + ((ZkConnection) _zkclient.getConnection()).getZookeeper());
- }
-
- protected void checkConnected() {
- if (!isConnected()) {
- throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
- }
- }
-
- protected void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType,
- EventType[] eventType) {
- checkConnected();
-
- PropertyType type = propertyKey.getType();
-
- synchronized (this) {
- for (CallbackHandler handler : _handlers) {
- // compare property-key path and listener reference
- if (handler.getPath().equals(propertyKey.getPath())
- && handler.getListener().equals(listener)) {
- LOG.info("Listener: " + listener + " on path: " + propertyKey.getPath()
- + " already exists. skip add");
-
- return;
- }
- }
-
- CallbackHandler newHandler =
- new CallbackHandler(this, _zkclient, propertyKey, listener, eventType, changeType);
-
- _handlers.add(newHandler);
- LOG.info("Added listener: " + listener + " for type: " + type + " to path: "
- + newHandler.getPath());
- }
- }
-
- protected void initHandlers(List<CallbackHandler> handlers) {
- synchronized (this) {
- if (handlers != null) {
- for (CallbackHandler handler : handlers) {
- handler.init();
- LOG.info("init handler: " + handler.getPath() + ", " + handler.getListener());
- }
- }
- }
- }
-
- protected void resetHandlers() {
- synchronized (this) {
- if (_handlers != null) {
- // get a copy of the list and iterate over the copy list
- // in case handler.reset() modify the original handler list
- List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
- tmpHandlers.addAll(_handlers);
-
- for (CallbackHandler handler : tmpHandlers) {
- handler.reset();
- LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
- }
- }
- }
- }
-
- /**
- * different helix-manager may override this to have a cache-enabled based-data-accessor
- * @param baseDataAccessor
- * @return
- */
- BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
- return baseDataAccessor;
- }
-
- void createClient() throws Exception {
- PathBasedZkSerializer zkSerializer =
- ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
-
- _zkclient =
- new ZkClient(_zkAddress, _sessionTimeout, ZkClient.DEFAULT_CONNECTION_TIMEOUT, zkSerializer);
-
- ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
-
- _baseDataAccessor = createBaseDataAccessor(baseDataAccessor);
-
- _dataAccessor = new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
- _configAccessor = new ConfigAccessor(_zkclient);
-
- int retryCount = 0;
-
- _zkclient.subscribeStateChanges(this);
- while (retryCount < 3) {
- try {
- _zkclient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
- handleStateChanged(KeeperState.SyncConnected);
- handleNewSession();
- break;
- } catch (HelixException e) {
- LOG.error("fail to createClient.", e);
- throw e;
- } catch (Exception e) {
- retryCount++;
-
- LOG.error("fail to createClient. retry " + retryCount, e);
- if (retryCount == 3) {
- throw e;
- }
- }
- }
- }
-
- // TODO separate out flapping detection code
- @Override
- public void handleStateChanged(KeeperState state) throws Exception {
- switch (state) {
- case SyncConnected:
- ZkConnection zkConnection = (ZkConnection) _zkclient.getConnection();
- LOG.info("KeeperState: " + state + ", zookeeper:" + zkConnection.getZookeeper());
- break;
- case Disconnected:
- LOG.info("KeeperState:" + state + ", disconnectedSessionId: " + _sessionId + ", instance: "
- + _instanceName + ", type: " + _instanceType);
-
- /**
- * Track the time stamp that the disconnected happens, then check history and see if
- * we should disconnect the helix-manager
- */
- _disconnectTimeHistory.add(System.currentTimeMillis());
- if (isFlapping()) {
- LOG.error("instanceName: " + _instanceName + " is flapping. diconnect it. "
- + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in "
- + _flappingTimeWindowMs + "ms.");
- disconnect();
- }
- break;
- case Expired:
- LOG.info("KeeperState:" + state + ", expiredSessionId: " + _sessionId + ", instance: "
- + _instanceName + ", type: " + _instanceType);
- break;
- default:
- break;
- }
- }
-
- /**
- * If zk state has changed into Disconnected for _maxDisconnectThreshold times during previous
- * _timeWindowLengthMs Ms
- * time window, we think that there are something wrong going on and disconnect the zkHelixManager
- * from zk.
- */
- private boolean isFlapping() {
- if (_disconnectTimeHistory.size() == 0) {
- return false;
- }
- long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
-
- // Remove disconnect history timestamp that are older than _flappingTimeWindowMs ago
- while ((_disconnectTimeHistory.get(0) + _flappingTimeWindowMs) < mostRecentTimestamp) {
- _disconnectTimeHistory.remove(0);
- }
- return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
- }
-
- /**
- * controller should override it to return a list of timers that need to start/stop when
- * leadership changes
- * @return
- */
- protected List<HelixTimerTask> getControllerHelixTimerTasks() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
deleted file mode 100644
index 1ed6dea..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
+++ /dev/null
@@ -1,175 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Timer;
-
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
-import org.apache.helix.healthcheck.HealthStatsAggregator;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.monitoring.ZKPathDataDumpTask;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-public class ControllerManager extends AbstractManager {
- private static Logger LOG = Logger.getLogger(ControllerManager.class);
-
- final GenericHelixController _controller = new GenericHelixController();
-
- // TODO merge into GenericHelixController
- private CallbackHandler _leaderElectionHandler = null;
-
- /**
- * status dump timer-task
- */
- static class StatusDumpTask extends HelixTimerTask {
- Timer _timer = null;
- final ZkClient zkclient;
- final AbstractManager helixController;
-
- public StatusDumpTask(ZkClient zkclient, AbstractManager helixController) {
- this.zkclient = zkclient;
- this.helixController = helixController;
- }
-
- @Override
- public void start() {
- long initialDelay = 30 * 60 * 1000;
- long period = 120 * 60 * 1000;
- int timeThresholdNoChange = 180 * 60 * 1000;
-
- if (_timer == null) {
- LOG.info("Start StatusDumpTask");
- _timer = new Timer("StatusDumpTimerTask", true);
- _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, zkclient,
- timeThresholdNoChange), initialDelay, period);
- }
-
- }
-
- @Override
- public void stop() {
- if (_timer != null) {
- LOG.info("Stop StatusDumpTask");
- _timer.cancel();
- _timer = null;
- }
- }
- }
-
- public ControllerManager(String zkAddress, String clusterName, String instanceName) {
- super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER);
-
- _timerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
- _timerTasks.add(new StatusDumpTask(_zkclient, this));
- }
-
- @Override
- protected List<HelixTimerTask> getControllerHelixTimerTasks() {
- return _timerTasks;
- }
-
- @Override
- public void handleNewSession() throws Exception {
- waitUntilConnected();
-
- /**
- * reset all handlers, make sure cleanup completed for previous session
- * disconnect if fail to cleanup
- */
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.reset();
- }
- // TODO reset user defined handlers only
- resetHandlers();
-
- /**
- * from here on, we are dealing with new session
- */
-
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.init();
- } else {
- _leaderElectionHandler =
- new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
- new DistributedLeaderElection(this, _controller), new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- }, ChangeType.CONTROLLER);
- }
-
- /**
- * init handlers
- * ok to init message handler and controller handlers twice
- * the second init will be skipped (see CallbackHandler)
- */
- initHandlers(_handlers);
- }
-
- @Override
- void doDisconnect() {
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.reset();
- }
- }
-
- @Override
- public boolean isLeader() {
- if (!isConnected()) {
- return false;
- }
-
- try {
- LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
- if (leader != null) {
- String leaderName = leader.getInstanceName();
- String sessionId = leader.getSessionId();
- if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
- && sessionId.equals(_sessionId)) {
- return true;
- }
- }
- } catch (Exception e) {
- // log
- }
- return false;
- }
-
- /**
- * helix-controller uses a write-through cache for external-view
- */
- @Override
- BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
- String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName);
- return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(extViewPath));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index ff3a264..d2b520b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -36,14 +36,14 @@ import org.apache.log4j.Logger;
public class ControllerManagerHelper {
private static Logger LOG = Logger.getLogger(ControllerManagerHelper.class);
- final AbstractManager _manager;
+ final HelixManager _manager;
final DefaultMessagingService _messagingService;
final List<HelixTimerTask> _controllerTimerTasks;
- public ControllerManagerHelper(AbstractManager manager) {
+ public ControllerManagerHelper(HelixManager manager, List<HelixTimerTask> controllerTimerTasks) {
_manager = manager;
_messagingService = (DefaultMessagingService) manager.getMessagingService();
- _controllerTimerTasks = manager.getControllerHelixTimerTasks();
+ _controllerTimerTasks = controllerTimerTasks;
}
public void addListenersToController(GenericHelixController controller) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
deleted file mode 100644
index c9ad0f3..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
-import org.apache.helix.healthcheck.HealthStatsAggregator;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
-import org.apache.helix.healthcheck.ParticipantHealthReportTask;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.participant.HelixStateMachineEngine;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-public class DistributedControllerManager extends AbstractManager {
- private static Logger LOG = Logger.getLogger(DistributedControllerManager.class);
-
- final StateMachineEngine _stateMachineEngine;
- final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
-
- CallbackHandler _leaderElectionHandler = null;
- final GenericHelixController _controller = new GenericHelixController();
-
- /**
- * hold timer tasks for controller only
- * we need to add/remove controller timer tasks during handle new session
- */
- final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<HelixTimerTask>();
-
- public DistributedControllerManager(String zkAddress, String clusterName, String instanceName) {
- super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER_PARTICIPANT);
-
- _stateMachineEngine = new HelixStateMachineEngine(this);
- _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
-
- _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
-
- _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
- _controllerTimerTasks.add(new ControllerManager.StatusDumpTask(_zkclient, this));
-
- }
-
- @Override
- public ParticipantHealthReportCollector getHealthReportCollector() {
- checkConnected();
- return _participantHealthInfoCollector;
- }
-
- @Override
- public StateMachineEngine getStateMachineEngine() {
- return _stateMachineEngine;
- }
-
- @Override
- protected List<HelixTimerTask> getControllerHelixTimerTasks() {
- return _controllerTimerTasks;
- }
-
- @Override
- public void handleNewSession() throws Exception {
- waitUntilConnected();
-
- ParticipantManagerHelper participantHelper =
- new ParticipantManagerHelper(this, _zkclient, _sessionTimeout);
-
- /**
- * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session
- * disconnect if fail to cleanup
- */
- stopTimerTasks();
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.reset();
- }
- resetHandlers();
-
- /**
- * clean up write-through cache
- */
- _baseDataAccessor.reset();
-
- /**
- * from here on, we are dealing with new session
- */
- if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
- throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName);
- }
-
- /**
- * auto-join
- */
- participantHelper.joinCluster();
-
- /**
- * Invoke PreConnectCallbacks
- */
- for (PreConnectCallback callback : _preConnectCallbacks) {
- callback.onPreConnect();
- }
-
- participantHelper.createLiveInstance();
-
- participantHelper.carryOverPreviousCurrentState();
-
- participantHelper.setupMsgHandler();
-
- /**
- * leader election
- */
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.init();
- } else {
- _leaderElectionHandler =
- new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
- new DistributedLeaderElection(this, _controller), new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- }, ChangeType.CONTROLLER);
- }
-
- /**
- * start health-check timer task
- */
- participantHelper.createHealthCheckPath();
- startTimerTasks();
-
- /**
- * init handlers
- * ok to init message handler, data-accessor, and controller handlers twice
- * the second init will be skipped (see CallbackHandler)
- */
- initHandlers(_handlers);
-
- }
-
- @Override
- void doDisconnect() {
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.reset();
- }
- }
-
- @Override
- public boolean isLeader() {
- if (!isConnected()) {
- return false;
- }
-
- try {
- LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
- if (leader != null) {
- String leaderName = leader.getInstanceName();
- String sessionId = leader.getSessionId();
- if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
- && sessionId.equals(_sessionId)) {
- return true;
- }
- }
- } catch (Exception e) {
- // log
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index 0ab8342..6a6d296 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -20,10 +20,12 @@ package org.apache.helix.manager.zk;
*/
import java.lang.management.ManagementFactory;
+import java.util.List;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.HelixTimerTask;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyType;
@@ -40,12 +42,15 @@ import org.apache.log4j.Logger;
public class DistributedLeaderElection implements ControllerChangeListener {
private static Logger LOG = Logger.getLogger(DistributedLeaderElection.class);
- final AbstractManager _manager;
+ final HelixManager _manager;
final GenericHelixController _controller;
+ final List<HelixTimerTask> _controllerTimerTasks;
- public DistributedLeaderElection(AbstractManager manager, GenericHelixController controller) {
+ public DistributedLeaderElection(HelixManager manager, GenericHelixController controller,
+ List<HelixTimerTask> controllerTimerTasks) {
_manager = manager;
_controller = controller;
+ _controllerTimerTasks = controllerTimerTasks;
}
/**
@@ -68,7 +73,8 @@ public class DistributedLeaderElection implements ControllerChangeListener {
return;
}
- ControllerManagerHelper controllerHelper = new ControllerManagerHelper(_manager);
+ ControllerManagerHelper controllerHelper =
+ new ControllerManagerHelper(_manager, _controllerTimerTasks);
try {
if (changeContext.getType().equals(NotificationContext.Type.INIT)
|| changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
@@ -84,7 +90,7 @@ public class DistributedLeaderElection implements ControllerChangeListener {
+ _manager.getClusterName());
updateHistory(manager);
- _manager._baseDataAccessor.reset();
+ _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
controllerHelper.addListenersToController(_controller);
controllerHelper.startControllerTimerTasks();
}
@@ -98,7 +104,7 @@ public class DistributedLeaderElection implements ControllerChangeListener {
/**
* clear write-through cache
*/
- _manager._baseDataAccessor.reset();
+ _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
deleted file mode 100644
index 0af7e77..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ /dev/null
@@ -1,155 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
-import org.apache.helix.healthcheck.ParticipantHealthReportTask;
-import org.apache.helix.participant.HelixStateMachineEngine;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-
-public class ParticipantManager extends AbstractManager {
-
- private static Logger LOG = Logger.getLogger(ParticipantManager.class);
-
- /**
- * state-transition message handler factory for helix-participant
- */
- final StateMachineEngine _stateMachineEngine;
-
- final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
-
- public ParticipantManager(String zkAddress, String clusterName, String instanceName) {
- super(zkAddress, clusterName, instanceName, InstanceType.PARTICIPANT);
-
- _stateMachineEngine = new HelixStateMachineEngine(this);
- _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
-
- _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
- }
-
- @Override
- public ParticipantHealthReportCollector getHealthReportCollector() {
- checkConnected();
- return _participantHealthInfoCollector;
- }
-
- @Override
- public StateMachineEngine getStateMachineEngine() {
- return _stateMachineEngine;
- }
-
- @Override
- public void handleNewSession() {
- waitUntilConnected();
-
- /**
- * stop timer tasks, reset all handlers, make sure cleanup completed for previous session
- * disconnect if cleanup fails
- */
- stopTimerTasks();
- resetHandlers();
-
- /**
- * clear write-through cache
- */
- _baseDataAccessor.reset();
-
- /**
- * from here on, we are dealing with new session
- */
- if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
- throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName);
- }
-
- /**
- * auto-join
- */
- ParticipantManagerHelper participantHelper =
- new ParticipantManagerHelper(this, _zkclient, _sessionTimeout);
- participantHelper.joinCluster();
-
- /**
- * Invoke PreConnectCallbacks
- */
- for (PreConnectCallback callback : _preConnectCallbacks) {
- callback.onPreConnect();
- }
-
- participantHelper.createLiveInstance();
-
- participantHelper.carryOverPreviousCurrentState();
-
- /**
- * setup message listener
- */
- participantHelper.setupMsgHandler();
-
- /**
- * start health check timer task
- */
- participantHelper.createHealthCheckPath();
- startTimerTasks();
-
- /**
- * init handlers
- * ok to init message handler and data-accessor twice
- * the second init will be skipped (see CallbackHandler)
- */
- initHandlers(_handlers);
-
- }
-
- /**
- * helix-participant uses a write-through cache for current-state
- */
- @Override
- BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
- String curStatePath =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName, _instanceName);
- return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(curStatePath));
-
- }
-
- @Override
- public boolean isLeader() {
- return false;
- }
-
- /**
- * disconnect logic for helix-participant
- */
- @Override
- void doDisconnect() {
- // nothing for participant
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
index 70dd592..e7f9efb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceInfoProvider;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.messaging.DefaultMessagingService;
@@ -55,7 +56,7 @@ public class ParticipantManagerHelper {
private static Logger LOG = Logger.getLogger(ParticipantManagerHelper.class);
final ZkClient _zkclient;
- final AbstractManager _manager;
+ final HelixManager _manager;
final PropertyKey.Builder _keyBuilder;
final String _clusterName;
final String _instanceName;
@@ -67,8 +68,10 @@ public class ParticipantManagerHelper {
final ZKHelixDataAccessor _dataAccessor;
final DefaultMessagingService _messagingService;
final StateMachineEngine _stateMachineEngine;
+ final LiveInstanceInfoProvider _liveInstanceInfoProvider;
- public ParticipantManagerHelper(AbstractManager manager, ZkClient zkclient, int sessionTimeout) {
+ public ParticipantManagerHelper(HelixManager manager, ZkClient zkclient, int sessionTimeout,
+ LiveInstanceInfoProvider liveInstanceInfoProvider) {
_zkclient = zkclient;
_manager = manager;
_clusterName = manager.getClusterName();
@@ -82,6 +85,7 @@ public class ParticipantManagerHelper {
_dataAccessor = (ZKHelixDataAccessor) manager.getHelixDataAccessor();
_messagingService = (DefaultMessagingService) manager.getMessagingService();
_stateMachineEngine = manager.getStateMachineEngine();
+ _liveInstanceInfoProvider = liveInstanceInfoProvider;
}
public void joinCluster() {
@@ -92,8 +96,8 @@ public class ParticipantManagerHelper {
new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
_manager.getClusterName()).build();
autoJoin =
- Boolean
- .parseBoolean(_configAccessor.get(scope, HelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
+ Boolean.parseBoolean(_configAccessor.get(scope,
+ ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin);
} catch (Exception e) {
// autoJoin is false
@@ -128,6 +132,19 @@ public class ParticipantManagerHelper {
liveInstance.setHelixVersion(_manager.getVersion());
liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+ // LiveInstanceInfoProvider liveInstanceInfoProvider = _manager._liveInstanceInfoProvider;
+ if (_liveInstanceInfoProvider != null) {
+ LOG.info("invoke liveInstanceInfoProvider");
+ ZNRecord additionalLiveInstanceInfo =
+ _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
+ if (additionalLiveInstanceInfo != null) {
+ additionalLiveInstanceInfo.merge(liveInstance.getRecord());
+ ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
+ liveInstance = new LiveInstance(mergedLiveInstance);
+ LOG.info("instanceName: " + _instanceName + ", mergedLiveInstance: " + liveInstance);
+ }
+ }
+
boolean retry;
do {
retry = false;
@@ -250,7 +267,7 @@ public class ParticipantManagerHelper {
}
}
- public void setupMsgHandler() {
+ public void setupMsgHandler() throws Exception {
_messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
_stateMachineEngine);
_manager.addMessageListener(_messagingService.getExecutor(), _instanceName);