You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/11/20 22:12:22 UTC
[07/52] [abbrv] [HELIX-279] Apply gc handling fixes to ZKHelixManager
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index afd35e6..b71304a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -19,18 +19,15 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
+import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
@@ -40,12 +37,12 @@ import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HealthStateChangeListener;
import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
import org.apache.helix.HelixTimerTask;
+import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.IdealStateChangeListener;
import org.apache.helix.InstanceConfigChangeListener;
import org.apache.helix.InstanceType;
@@ -59,141 +56,222 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.ScopedConfigChangeListener;
import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.healthcheck.HealthStatsAggregationTask;
import org.apache.helix.healthcheck.HealthStatsAggregator;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.healthcheck.ParticipantHealthReportTask;
import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.model.builder.ConfigScopeBuilder;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
-import org.apache.helix.participant.DistClusterControllerElection;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper.States;
+
+public class ZKHelixManager implements HelixManager, IZkStateListener {
+ private static Logger LOG = Logger.getLogger(ZKHelixManager.class);
+
+ public static final int FLAPPING_TIME_WINDIOW = 300000; // Default to 300 sec
+ public static final int MAX_DISCONNECT_THRESHOLD = 5;
+ public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
-public class ZKHelixManager implements HelixManager {
- private static Logger logger = Logger.getLogger(ZKHelixManager.class);
- private static final int RETRY_LIMIT = 3;
- private static final int CONNECTIONTIMEOUT = 60 * 1000;
+ protected final String _zkAddress;
private final String _clusterName;
private final String _instanceName;
- private final String _zkConnectString;
- private static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
- private ZKHelixDataAccessor _helixAccessor;
- private ConfigAccessor _configAccessor;
- protected ZkClient _zkClient;
- protected final List<CallbackHandler> _handlers = new ArrayList<CallbackHandler>();
- private final ZkStateChangeListener _zkStateChangeListener;
private final InstanceType _instanceType;
- volatile String _sessionId;
- private Timer _timer;
- private CallbackHandler _leaderElectionHandler;
- private ParticipantHealthReportCollectorImpl _participantHealthCheckInfoCollector;
- private ParticipantHealthReportTask _participantHealthReportTask;
- private final DefaultMessagingService _messagingService;
- private ZKHelixAdmin _managementTool;
- private final String _version;
+ private final int _sessionTimeout;
+ private final List<PreConnectCallback> _preConnectCallbacks;
+ protected final List<CallbackHandler> _handlers;
private final HelixManagerProperties _properties;
- private final StateMachineEngine _stateMachEngine;
- private int _sessionTimeout;
- private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
- private final List<HelixTimerTask> _controllerTimerTasks;
+
+ /**
+ * helix version#
+ */
+ private final String _version;
+
+ protected ZkClient _zkclient = null;
+ private final DefaultMessagingService _messagingService;
+
private BaseDataAccessor<ZNRecord> _baseDataAccessor;
- List<PreConnectCallback> _preConnectCallbacks = new LinkedList<PreConnectCallback>();
- ZKPropertyTransferServer _transferServer = null;
- int _flappingTimeWindowMs;
- int _maxDisconnectThreshold;
- public static final int FLAPPING_TIME_WINDIOW = 300000; // Default to 300 sec
- public static final int MAX_DISCONNECT_THRESHOLD = 5;
- LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
- public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
+ private ZKHelixDataAccessor _dataAccessor;
+ private final Builder _keyBuilder;
+ private ConfigAccessor _configAccessor;
+ private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
+ protected LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
- public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
- String zkConnectString) {
- logger.info("Create a zk-based cluster manager. clusterName:" + clusterName + ", instanceName:"
- + instanceName + ", type:" + instanceType + ", zkSvr:" + zkConnectString);
- _flappingTimeWindowMs = FLAPPING_TIME_WINDIOW;
- try {
- _flappingTimeWindowMs =
- Integer.parseInt(System.getProperty("helixmanager.flappingTimeWindow", ""
- + FLAPPING_TIME_WINDIOW));
- } catch (NumberFormatException e) {
- logger.warn("Exception while parsing helixmanager.flappingTimeWindow: "
- + System.getProperty("helixmanager.flappingTimeWindow", "" + FLAPPING_TIME_WINDIOW));
- }
- _maxDisconnectThreshold = MAX_DISCONNECT_THRESHOLD;
- try {
- _maxDisconnectThreshold =
- Integer.parseInt(System.getProperty("helixmanager.maxDisconnectThreshold", ""
- + MAX_DISCONNECT_THRESHOLD));
- } catch (NumberFormatException e) {
- logger.warn("Exception while parsing helixmanager.maxDisconnectThreshold: "
- + System
- .getProperty("helixmanager.maxDisconnectThreshold", "" + MAX_DISCONNECT_THRESHOLD));
- }
- int sessionTimeoutInt = -1;
- try {
- sessionTimeoutInt =
- Integer.parseInt(System.getProperty("zk.session.timeout", "" + DEFAULT_SESSION_TIMEOUT));
- } catch (NumberFormatException e) {
- logger.warn("Exception while parsing session timeout: "
- + System.getProperty("zk.session.timeout", "" + DEFAULT_SESSION_TIMEOUT));
+ private volatile String _sessionId;
+
+ /**
+ * Keep track of timestamps that zk State has become Disconnected
+ * If in a _timeWindowLengthMs window zk State has become Disconnected
+ * for more than_maxDisconnectThreshold times disconnect the zkHelixManager
+ */
+ private final List<Long> _disconnectTimeHistory = new ArrayList<Long>();
+ private final int _flappingTimeWindowMs;
+ private final int _maxDisconnectThreshold;
+
+ /**
+ * participant fields
+ */
+ private final StateMachineEngine _stateMachineEngine;
+ private final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
+ private final List<HelixTimerTask> _timerTasks = new ArrayList<HelixTimerTask>();
+
+ /**
+ * controller fields
+ */
+ private final GenericHelixController _controller = new GenericHelixController();
+ private CallbackHandler _leaderElectionHandler = null;
+ protected final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<HelixTimerTask>();
+
+ /**
+ * status dump timer-task
+ */
+ static class StatusDumpTask extends HelixTimerTask {
+ Timer _timer = null;
+ final ZkClient zkclient;
+ final HelixManager helixController;
+
+ public StatusDumpTask(ZkClient zkclient, HelixManager helixController) {
+ this.zkclient = zkclient;
+ this.helixController = helixController;
+ }
+
+ @Override
+ public void start() {
+ long initialDelay = 30 * 60 * 1000;
+ long period = 120 * 60 * 1000;
+ int timeThresholdNoChange = 180 * 60 * 1000;
+
+ if (_timer == null) {
+ LOG.info("Start StatusDumpTask");
+ _timer = new Timer("StatusDumpTimerTask", true);
+ _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, zkclient,
+ timeThresholdNoChange), initialDelay, period);
+ }
}
- if (sessionTimeoutInt > 0) {
- _sessionTimeout = sessionTimeoutInt;
- } else {
- _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
+
+ @Override
+ public void stop() {
+ if (_timer != null) {
+ LOG.info("Stop StatusDumpTask");
+ _timer.cancel();
+ _timer = null;
+ }
}
+ }
+
+ public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
+ String zkAddress) {
+
+ LOG.info("Create a zk-based cluster manager. zkSvr: " + zkAddress + ", clusterName: "
+ + clusterName + ", instanceName: " + instanceName + ", type: " + instanceType);
+
+ _zkAddress = zkAddress;
+ _clusterName = clusterName;
+ _instanceType = instanceType;
+
if (instanceName == null) {
try {
instanceName =
InetAddress.getLocalHost().getCanonicalHostName() + "-" + instanceType.toString();
} catch (UnknownHostException e) {
// can ignore it
- logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
+ LOG.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
instanceName = "UNKNOWN";
}
}
- _clusterName = clusterName;
_instanceName = instanceName;
- _instanceType = instanceType;
- _zkConnectString = zkConnectString;
- _zkStateChangeListener =
- new ZkStateChangeListener(this, _flappingTimeWindowMs, _maxDisconnectThreshold);
- _timer = null;
+ _preConnectCallbacks = new ArrayList<PreConnectCallback>();
+ _handlers = new ArrayList<CallbackHandler>();
+ _properties = new HelixManagerProperties("cluster-manager-version.properties");
+ _version = _properties.getVersion();
+ _keyBuilder = new Builder(clusterName);
_messagingService = new DefaultMessagingService(this);
- _properties = new HelixManagerProperties("cluster-manager-version.properties");
- _version = _properties.getVersion();
+ /**
+ * use system property if available
+ */
+ _flappingTimeWindowMs =
+ getSystemPropertyAsInt("helixmanager.flappingTimeWindow",
+ ZKHelixManager.FLAPPING_TIME_WINDIOW);
+
+ _maxDisconnectThreshold =
+ getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold",
+ ZKHelixManager.MAX_DISCONNECT_THRESHOLD);
- _stateMachEngine = new HelixStateMachineEngine(this);
+ _sessionTimeout =
+ getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
+
+ /**
+ * instance type specific init
+ */
+ switch (instanceType) {
+ case PARTICIPANT:
+ _stateMachineEngine = new HelixStateMachineEngine(this);
+ _participantHealthInfoCollector =
+ new ParticipantHealthReportCollectorImpl(this, _instanceName);
+
+ _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
+
+ break;
+ case CONTROLLER:
+ _stateMachineEngine = null;
+ _participantHealthInfoCollector = null;
+ _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
+ _controllerTimerTasks.add(new StatusDumpTask(_zkclient, this));
+
+ break;
+ case CONTROLLER_PARTICIPANT:
+ _stateMachineEngine = new HelixStateMachineEngine(this);
+ _participantHealthInfoCollector =
+ new ParticipantHealthReportCollectorImpl(this, _instanceName);
+
+ _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
- // add all timer tasks
- _controllerTimerTasks = new ArrayList<HelixTimerTask>();
- if (_instanceType == InstanceType.CONTROLLER) {
_controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
+ _controllerTimerTasks.add(new StatusDumpTask(_zkclient, this));
+
+ break;
+ case ADMINISTRATOR:
+ case SPECTATOR:
+ _stateMachineEngine = null;
+ _participantHealthInfoCollector = null;
+ break;
+ default:
+ throw new IllegalArgumentException("unrecognized type: " + instanceType);
}
}
+ private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {
+ String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue);
+
+ try {
+ int value = Integer.parseInt(valueString);
+ if (value > 0) {
+ return value;
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString
+ + ", using default value: " + propertyDefaultValue);
+ }
+
+ return propertyDefaultValue;
+ }
+
@Override
public boolean removeListener(PropertyKey key, Object listener) {
- logger.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: "
+ LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: "
+ _clusterName + " by instance: " + _instanceName);
synchronized (this) {
@@ -216,7 +294,13 @@ public class ZKHelixManager implements HelixManager {
return true;
}
- private void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType,
+ void checkConnected() {
+ if (!isConnected()) {
+ throw new HelixException("HelixManager is not connected. Call HelixManager#connect()");
+ }
+ }
+
+ void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType,
EventType[] eventType) {
checkConnected();
@@ -227,16 +311,18 @@ public class ZKHelixManager implements HelixManager {
// compare property-key path and listener reference
if (handler.getPath().equals(propertyKey.getPath())
&& handler.getListener().equals(listener)) {
- logger.info("Listener: " + listener + " on path: " + propertyKey.getPath()
- + " already exists. skip adding it");
+ LOG.info("Listener: " + listener + " on path: " + propertyKey.getPath()
+ + " already exists. skip add");
+
return;
}
}
CallbackHandler newHandler =
- createCallBackHandler(propertyKey, listener, eventType, changeType);
+ new CallbackHandler(this, _zkclient, propertyKey, listener, eventType, changeType);
+
_handlers.add(newHandler);
- logger.info("Add listener: " + listener + " for type: " + type + " to path: "
+ LOG.info("Added listener: " + listener + " for type: " + type + " to path: "
+ newHandler.getPath());
}
}
@@ -259,7 +345,7 @@ public class ZKHelixManager implements HelixManager {
}
@Override
- public void addConfigChangeListener(ConfigChangeListener listener) {
+ public void addConfigChangeListener(ConfigChangeListener listener) throws Exception {
addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
new EventType[] {
EventType.NodeChildrenChanged
@@ -267,7 +353,8 @@ public class ZKHelixManager implements HelixManager {
}
@Override
- public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener) {
+ public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
+ throws Exception {
addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
new EventType[] {
EventType.NodeChildrenChanged
@@ -275,7 +362,8 @@ public class ZKHelixManager implements HelixManager {
}
@Override
- public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope) {
+ public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
+ throws Exception {
Builder keyBuilder = new Builder(_clusterName);
PropertyKey propertyKey = null;
@@ -298,7 +386,7 @@ public class ZKHelixManager implements HelixManager {
EventType.NodeChildrenChanged
});
} else {
- logger.error("Can't add listener to config scope: " + scope);
+ LOG.error("Can't add listener to config scope: " + scope);
}
}
@@ -312,7 +400,8 @@ public class ZKHelixManager implements HelixManager {
});
}
- void addControllerMessageListener(MessageListener listener) {
+ @Override
+ public void addControllerMessageListener(MessageListener listener) {
addListener(listener, new Builder(_clusterName).controllerMessages(),
ChangeType.MESSAGES_CONTROLLER, new EventType[] {
EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
@@ -321,7 +410,7 @@ public class ZKHelixManager implements HelixManager {
@Override
public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
- String instanceName, String sessionId) {
+ String instanceName, String sessionId) throws Exception {
addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId),
ChangeType.CURRENT_STATE, new EventType[] {
EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
@@ -329,7 +418,8 @@ public class ZKHelixManager implements HelixManager {
}
@Override
- public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName) {
+ public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
+ throws Exception {
addListener(listener, new Builder(_clusterName).healthReports(instanceName), ChangeType.HEALTH,
new EventType[] {
EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
@@ -337,7 +427,7 @@ public class ZKHelixManager implements HelixManager {
}
@Override
- public void addExternalViewChangeListener(ExternalViewChangeListener listener) {
+ public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception {
addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW,
new EventType[] {
EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
@@ -355,7 +445,7 @@ public class ZKHelixManager implements HelixManager {
@Override
public HelixDataAccessor getHelixDataAccessor() {
checkConnected();
- return _helixAccessor;
+ return _dataAccessor;
}
@Override
@@ -374,484 +464,158 @@ public class ZKHelixManager implements HelixManager {
return _instanceName;
}
- @Override
- public void connect() throws Exception {
- logger.info("ClusterManager.connect()");
- if (_zkStateChangeListener.isConnected()) {
- logger.warn("Cluster manager " + _clusterName + " " + _instanceName + " already connected");
- return;
- }
-
- try {
- createClient(_zkConnectString);
- _messagingService.onConnected();
- } catch (Exception e) {
- logger.error(e);
- disconnect();
- throw e;
- }
- }
-
- @Override
- public void disconnect() {
- if (!isConnected()) {
- logger.error("ClusterManager " + _instanceName + " already disconnected");
- return;
- }
- disconnectInternal();
- }
-
- void disconnectInternal() {
- // This function can be called when the connection are in bad state(e.g. flapping),
- // in which isConnected() could be false and we want to disconnect from cluster.
- logger.info("disconnect " + _instanceName + "(" + _instanceType + ") from " + _clusterName);
-
- /**
- * shutdown thread pool first to avoid reset() being invoked in the middle of state
- * transition
- */
- _messagingService.getExecutor().shutdown();
- resetHandlers();
-
- _helixAccessor.shutdown();
-
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.reset();
- }
-
- if (_participantHealthCheckInfoCollector != null) {
- _participantHealthReportTask.stop();
- }
-
- if (_timer != null) {
- _timer.cancel();
- _timer = null;
- }
-
- if (_instanceType == InstanceType.CONTROLLER) {
- stopTimerTasks();
- }
-
- // unsubscribe accessor from controllerChange
- _zkClient.unsubscribeAll();
-
- _zkClient.close();
-
- // HACK seems that zkClient is not sending DISCONNECT event
- _zkStateChangeListener.disconnect();
- logger.info("Cluster manager: " + _instanceName + " disconnected");
-
- }
-
- @Override
- public String getSessionId() {
- checkConnected();
- return _sessionId;
- }
-
- @Override
- public boolean isConnected() {
- return _zkStateChangeListener.isConnected();
- }
-
- @Override
- public long getLastNotificationTime() {
- return -1;
- }
-
- private void addLiveInstance() {
- LiveInstance liveInstance = new LiveInstance(_instanceName);
- liveInstance.setSessionId(_sessionId);
- liveInstance.setHelixVersion(_version);
- liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
-
- if (_liveInstanceInfoProvider != null) {
- logger.info("invoking _liveInstanceInfoProvider");
- ZNRecord additionalLiveInstanceInfo =
- _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
- if (additionalLiveInstanceInfo != null) {
- additionalLiveInstanceInfo.merge(liveInstance.getRecord());
- ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
- liveInstance = new LiveInstance(mergedLiveInstance);
- logger.info("liveInstance content :" + _instanceName + " " + liveInstance.toString());
- }
- }
-
- logger.info("Add live instance: InstanceName: " + _instanceName + " Session id:" + _sessionId);
- Builder keyBuilder = _helixAccessor.keyBuilder();
- if (!_helixAccessor.createProperty(keyBuilder.liveInstance(_instanceName), liveInstance)) {
- String errorMsg =
- "Fail to create live instance node after waiting, so quit. instance:" + _instanceName;
- logger.warn(errorMsg);
- throw new HelixException(errorMsg);
-
- }
- String currentStatePathParent =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName, _instanceName,
- getSessionId());
-
- if (!_zkClient.exists(currentStatePathParent)) {
- _zkClient.createPersistent(currentStatePathParent);
- logger.info("Creating current state path " + currentStatePathParent);
- }
- }
-
- private void startStatusUpdatedumpTask() {
- long initialDelay = 30 * 60 * 1000;
- long period = 120 * 60 * 1000;
- int timeThresholdNoChange = 180 * 60 * 1000;
+ BaseDataAccessor<ZNRecord> createBaseDataAccessor() {
+ ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
- if (_timer == null) {
- _timer = new Timer(true);
- _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(this, _zkClient, timeThresholdNoChange),
- initialDelay, period);
- }
+ return baseDataAccessor;
}
- private void createClient(String zkServers) throws Exception {
- // by default use ZNRecordStreamingSerializer except for paths within the property
- // store which expects raw byte[] serialization/deserialization
+ void createClient() throws Exception {
PathBasedZkSerializer zkSerializer =
ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
- _zkClient = new ZkClient(zkServers, _sessionTimeout, CONNECTIONTIMEOUT, zkSerializer);
+ _zkclient =
+ new ZkClient(_zkAddress, _sessionTimeout, ZkClient.DEFAULT_CONNECTION_TIMEOUT, zkSerializer);
- ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
- if (_instanceType == InstanceType.PARTICIPANT) {
- String curStatePath =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName, _instanceName);
- _baseDataAccessor =
- new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(curStatePath));
- } else if (_instanceType == InstanceType.CONTROLLER) {
- String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,
+ _baseDataAccessor = createBaseDataAccessor();
- _clusterName);
- _baseDataAccessor =
- new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(extViewPath));
+ _dataAccessor = new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
+ _configAccessor = new ConfigAccessor(_zkclient);
- } else {
- _baseDataAccessor = baseDataAccessor;
- }
-
- _helixAccessor = new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
- _configAccessor = new ConfigAccessor(_zkClient);
int retryCount = 0;
- _zkClient.subscribeStateChanges(_zkStateChangeListener);
- while (retryCount < RETRY_LIMIT) {
+ _zkclient.subscribeStateChanges(this);
+ while (retryCount < 3) {
try {
- _zkClient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
- _zkStateChangeListener.handleStateChanged(KeeperState.SyncConnected);
- _zkStateChangeListener.handleNewSession();
+ _zkclient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
+ handleStateChanged(KeeperState.SyncConnected);
+ handleNewSession();
break;
} catch (HelixException e) {
- logger.error("fail to createClient.", e);
+ LOG.error("fail to createClient.", e);
throw e;
} catch (Exception e) {
retryCount++;
- logger.error("fail to createClient. retry " + retryCount, e);
- if (retryCount == RETRY_LIMIT) {
+ LOG.error("fail to createClient. retry " + retryCount, e);
+ if (retryCount == 3) {
throw e;
}
}
}
}
- private CallbackHandler createCallBackHandler(PropertyKey propertyKey, Object listener,
- EventType[] eventTypes, ChangeType changeType) {
- if (listener == null) {
- throw new HelixException("Listener cannot be null");
+ @Override
+ public void connect() throws Exception {
+ LOG.info("ClusterManager.connect()");
+ if (isConnected()) {
+ LOG.warn("Cluster manager: " + _instanceName + " for cluster: " + _clusterName
+ + " already connected. skip connect");
+ return;
}
- return new CallbackHandler(this, _zkClient, propertyKey, listener, eventTypes, changeType);
- }
-
- /**
- * This will be invoked when ever a new session is created<br/>
- * case 1: the cluster manager was a participant carry over current state, add live
- * instance, and invoke message listener; case 2: the cluster manager was controller and
- * was a leader before do leader election, and if it becomes leader again, invoke ideal
- * state listener, current state listener, etc. if it fails to become leader in the new
- * session, then becomes standby; case 3: the cluster manager was controller and was NOT
- * a leader before do leader election, and if it becomes leader, instantiate and invoke
- * ideal state listener, current state listener, etc. if if fails to become leader in
- * the new session, stay as standby
- */
- protected void handleNewSession() {
- boolean isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
- while (!isConnected) {
- logger.error("Could NOT connect to zk server in " + CONNECTIONTIMEOUT + "ms. zkServer: "
- + _zkConnectString + ", expiredSessionId: " + _sessionId + ", clusterName: "
- + _clusterName);
- isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
+ try {
+ createClient();
+ _messagingService.onConnected();
+ } catch (Exception e) {
+ LOG.error("fail to connect " + _instanceName, e);
+ disconnect();
+ throw e;
}
+ }
- ZkConnection zkConnection = ((ZkConnection) _zkClient.getConnection());
-
- synchronized (this) {
- _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
+ @Override
+ public void disconnect() {
+ if (_zkclient == null) {
+ LOG.info("instanceName: " + _instanceName + " already disconnected");
+ return;
}
- _baseDataAccessor.reset();
-
- // reset all handlers so they have a chance to unsubscribe zk changes from zkclient
- // abandon all callback-handlers added in expired session
- resetHandlers();
- logger.info("Handling new session, session id:" + _sessionId + ", instance:" + _instanceName
- + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
+ LOG.info("disconnect " + _instanceName + "(" + _instanceType + ") from " + _clusterName);
- logger.info(zkConnection.getZookeeper());
-
- if (!ZKUtil.isClusterSetup(_clusterName, _zkClient)) {
- throw new HelixException("Initial cluster structure is not set up for cluster:"
- + _clusterName);
- }
- // Read cluster config and see if instance can auto join the cluster
- boolean autoJoin = false;
try {
- HelixConfigScope scope =
- new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(getClusterName())
- .build();
- autoJoin = Boolean.parseBoolean(getConfigAccessor().get(scope, ALLOW_PARTICIPANT_AUTO_JOIN));
- logger.info("Auto joining " + _clusterName + " is true");
- } catch (Exception e) {
- }
- if (!ZKUtil.isInstanceSetup(_zkClient, _clusterName, _instanceName, _instanceType)) {
- if (!autoJoin) {
- throw new HelixException("Initial cluster structure is not set up for instance:"
- + _instanceName + " instanceType:" + _instanceType);
- } else {
- logger.info("Auto joining instance " + _instanceName);
- InstanceConfig instanceConfig = new InstanceConfig(_instanceName);
- String hostName = _instanceName;
- String port = "";
- int lastPos = _instanceName.lastIndexOf("_");
- if (lastPos > 0) {
- hostName = _instanceName.substring(0, lastPos);
- port = _instanceName.substring(lastPos + 1);
- }
- instanceConfig.setHostName(hostName);
- instanceConfig.setPort(port);
- instanceConfig.setInstanceEnabled(true);
- getClusterManagmentTool().addInstance(_clusterName, instanceConfig);
- }
- }
+ /**
+ * stop all timer tasks
+ */
+ stopTimerTasks();
- if (_instanceType == InstanceType.PARTICIPANT
- || _instanceType == InstanceType.CONTROLLER_PARTICIPANT) {
- handleNewSessionAsParticipant();
- }
+ /**
+ * shutdown thread pool first to avoid reset() being invoked in the middle of state
+ * transition
+ */
+ _messagingService.getExecutor().shutdown();
+
+ // TODO reset user defined handlers only
+ resetHandlers();
- if (_instanceType == InstanceType.CONTROLLER
- || _instanceType == InstanceType.CONTROLLER_PARTICIPANT) {
- addControllerMessageListener(_messagingService.getExecutor());
- MessageHandlerFactory defaultControllerMsgHandlerFactory =
- new DefaultControllerMessageHandlerFactory();
- _messagingService.getExecutor().registerMessageHandlerFactory(
- defaultControllerMsgHandlerFactory.getMessageType(), defaultControllerMsgHandlerFactory);
- MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
- new DefaultSchedulerMessageHandlerFactory(this);
- _messagingService.getExecutor().registerMessageHandlerFactory(
- defaultSchedulerMsgHandlerFactory.getMessageType(), defaultSchedulerMsgHandlerFactory);
- MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
- new DefaultParticipantErrorMessageHandlerFactory(this);
- _messagingService.getExecutor().registerMessageHandlerFactory(
- defaultParticipantErrorMessageHandlerFactory.getMessageType(),
- defaultParticipantErrorMessageHandlerFactory);
+ _dataAccessor.shutdown();
if (_leaderElectionHandler != null) {
_leaderElectionHandler.reset();
- _leaderElectionHandler.init();
- } else {
- _leaderElectionHandler =
- createCallBackHandler(new Builder(_clusterName).controller(),
- new DistClusterControllerElection(_zkConnectString), new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- }, ChangeType.CONTROLLER);
}
- }
- if (_instanceType == InstanceType.PARTICIPANT
- || _instanceType == InstanceType.CONTROLLER_PARTICIPANT
- || (_instanceType == InstanceType.CONTROLLER && isLeader())) {
- initHandlers();
+ } finally {
+ _zkclient.close();
+ _zkclient = null;
+ LOG.info("Cluster manager: " + _instanceName + " disconnected");
}
}
- private void handleNewSessionAsParticipant() {
- // In case there is a live instance record on zookeeper
- Builder keyBuilder = _helixAccessor.keyBuilder();
-
- if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null) {
- logger.warn("Found another instance with same instanceName: " + _instanceName
- + " in cluster " + _clusterName);
- // Wait for a while, in case previous storage node exits unexpectedly
- // and its liveinstance
- // still hangs around until session timeout happens
- try {
- Thread.sleep(_sessionTimeout + 5000);
- } catch (InterruptedException e) {
- logger.warn("Sleep interrupted while waiting for previous liveinstance to go away.", e);
- }
+ @Override
+ public String getSessionId() {
+ checkConnected();
+ return _sessionId;
+ }
- if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null) {
- String errorMessage =
- "instance " + _instanceName + " already has a liveinstance in cluster " + _clusterName;
- logger.error(errorMessage);
- throw new HelixException(errorMessage);
- }
+ @Override
+ public boolean isConnected() {
+ if (_zkclient == null) {
+ return false;
}
- // Invoke the PreConnectCallbacks
- for (PreConnectCallback callback : _preConnectCallbacks) {
- callback.onPreConnect();
+ ZkConnection zkconnection = (ZkConnection) _zkclient.getConnection();
+ if (zkconnection != null) {
+ States state = zkconnection.getZookeeperState();
+ return state == States.CONNECTED;
}
- addLiveInstance();
- carryOverPreviousCurrentState();
-
- // In case the cluster manager is running as a participant, setup message
- // listener
- _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
- _stateMachEngine);
- addMessageListener(_messagingService.getExecutor(), _instanceName);
- addControllerListener(_helixAccessor);
-
- ScheduledTaskStateModelFactory stStateModelFactory =
- new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
- _stateMachEngine.registerStateModelFactory(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
+ return false;
+ }
- if (_participantHealthCheckInfoCollector == null) {
- _participantHealthCheckInfoCollector =
- new ParticipantHealthReportCollectorImpl(this, _instanceName);
- _participantHealthReportTask =
- new ParticipantHealthReportTask(_participantHealthCheckInfoCollector);
- _participantHealthReportTask.start();
- }
- // start the participant health check timer, also create zk path for health
- // check info
- String healthCheckInfoPath = _helixAccessor.keyBuilder().healthReports(_instanceName).getPath();
- if (!_zkClient.exists(healthCheckInfoPath)) {
- _zkClient.createPersistent(healthCheckInfoPath, true);
- logger.info("Creating healthcheck info path " + healthCheckInfoPath);
- }
+ @Override
+ public long getLastNotificationTime() {
+ return 0;
}
@Override
public void addPreConnectCallback(PreConnectCallback callback) {
- logger.info("Adding preconnect callback");
+ LOG.info("Adding preconnect callback: " + callback);
_preConnectCallbacks.add(callback);
}
- private void resetHandlers() {
- synchronized (this) {
- if (_handlers != null) {
- // get a copy of the list and iterate over the copy list
- // in case handler.reset() will modify the original handler list
- List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
- tmpHandlers.addAll(_handlers);
-
- for (CallbackHandler handler : tmpHandlers) {
- handler.reset();
- logger.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
- }
- }
- }
- }
-
- private void initHandlers() {
- synchronized (this) {
- if (_handlers != null) {
- // may add new currentState and message listeners during init()
- // so make a copy and iterate over the copy
- List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
- tmpHandlers.addAll(_handlers);
- for (CallbackHandler handler : tmpHandlers) {
- handler.init();
- logger.info("init handler: " + handler.getPath() + ", " + handler.getListener());
- }
- }
- }
- }
-
@Override
public boolean isLeader() {
- if (!isConnected()) {
+ if (_instanceType != InstanceType.CONTROLLER
+ && _instanceType != InstanceType.CONTROLLER_PARTICIPANT) {
return false;
}
- if (_instanceType != InstanceType.CONTROLLER) {
- return false;
- }
-
- Builder keyBuilder = _helixAccessor.keyBuilder();
- LiveInstance leader = _helixAccessor.getProperty(keyBuilder.controllerLeader());
- if (leader == null) {
+ if (!isConnected()) {
return false;
- } else {
- String leaderName = leader.getInstanceName();
- // TODO need check sessionId also, but in distributed mode, leader's
- // sessionId is
- // not equal to
- // the leader znode's sessionId field which is the sessionId of the
- // controller_participant that
- // successfully creates the leader node
- if (leaderName == null || !leaderName.equals(_instanceName)) {
- return false;
- }
}
- return true;
- }
-
- /**
- * carry over current-states from last sessions
- * set to initial state for current session only when the state doesn't exist in current session
- */
- private void carryOverPreviousCurrentState() {
- Builder keyBuilder = _helixAccessor.keyBuilder();
- List<String> sessions = _helixAccessor.getChildNames(keyBuilder.sessions(_instanceName));
- // carry-over
- for (String session : sessions) {
- if (session.equals(_sessionId)) {
- continue;
- }
-
- List<CurrentState> lastCurStates =
- _helixAccessor.getChildValues(keyBuilder.currentStates(_instanceName, session));
-
- for (CurrentState lastCurState : lastCurStates) {
- logger.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId()
- + " to current session: " + _sessionId);
- String stateModelDefRef = lastCurState.getStateModelDefRef();
- if (stateModelDefRef == null) {
- logger
- .error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
- + lastCurState);
- continue;
+ try {
+ LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
+ if (leader != null) {
+ String leaderName = leader.getInstanceName();
+ String sessionId = leader.getSessionId();
+ if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
+ && sessionId.equals(_sessionId)) {
+ return true;
}
- StateModelDefinition stateModel =
- _helixAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
-
- String curStatePath =
- keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName())
- .getPath();
- _helixAccessor.getBaseDataAccessor().update(curStatePath,
- new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState),
- AccessOption.PERSISTENT);
- }
- }
-
- // remove previous current states
- for (String session : sessions) {
- if (session.equals(_sessionId)) {
- continue;
}
-
- String path = _helixAccessor.keyBuilder().currentStates(_instanceName, session).getPath();
- logger.info("Removing current states from previous sessions. path: " + path);
- _zkClient.deleteRecursive(path);
+ } catch (Exception e) {
+ // log
}
+ return false;
}
@Override
@@ -862,7 +626,7 @@ public class ZKHelixManager implements HelixManager {
String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
_helixPropertyStore =
- new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkClient), path,
+ new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient), path,
null);
}
@@ -872,13 +636,12 @@ public class ZKHelixManager implements HelixManager {
@Override
public synchronized HelixAdmin getClusterManagmentTool() {
checkConnected();
- if (_zkClient != null) {
- _managementTool = new ZKHelixAdmin(_zkClient);
- } else {
- logger.error("Couldn't get ZKClusterManagementTool because zkClient is null");
+ if (_zkclient != null) {
+ return new ZKHelixAdmin(_zkclient);
}
- return _managementTool;
+ LOG.error("Couldn't get ZKClusterManagementTool because zkclient is null");
+ return null;
}
@Override
@@ -891,7 +654,7 @@ public class ZKHelixManager implements HelixManager {
@Override
public ParticipantHealthReportCollector getHealthReportCollector() {
checkConnected();
- return _participantHealthCheckInfoCollector;
+ return _participantHealthInfoCollector;
}
@Override
@@ -899,12 +662,6 @@ public class ZKHelixManager implements HelixManager {
return _instanceType;
}
- private void checkConnected() {
- if (!isConnected()) {
- throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
- }
- }
-
@Override
public String getVersion() {
return _version;
@@ -917,21 +674,20 @@ public class ZKHelixManager implements HelixManager {
@Override
public StateMachineEngine getStateMachineEngine() {
- return _stateMachEngine;
+ return _stateMachineEngine;
}
// TODO: rename this and not expose this function as part of interface
@Override
public void startTimerTasks() {
- for (HelixTimerTask task : _controllerTimerTasks) {
+ for (HelixTimerTask task : _timerTasks) {
task.start();
}
- startStatusUpdatedumpTask();
}
@Override
public void stopTimerTasks() {
- for (HelixTimerTask task : _controllerTimerTasks) {
+ for (HelixTimerTask task : _timerTasks) {
task.stop();
}
}
@@ -940,4 +696,209 @@ public class ZKHelixManager implements HelixManager {
public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
_liveInstanceInfoProvider = liveInstanceInfoProvider;
}
+
+ /**
+ * wait until we get a non-zero session-id. note that we might lose zkconnection
+ * right after we read session-id. but it's ok to get stale session-id and we will have
+ * another handle-new-session callback to correct this.
+ */
+ void waitUntilConnected() {
+ boolean isConnected;
+ do {
+ isConnected =
+ _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!isConnected) {
+ LOG.error("fail to connect zkserver: " + _zkAddress + " in "
+ + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
+ + ", clusterName: " + _clusterName);
+ continue;
+ }
+
+ ZkConnection zkConnection = ((ZkConnection) _zkclient.getConnection());
+ _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
+
+ /**
+ * at the time we read session-id, zkconnection might be lost again
+ * wait until we get a non-zero session-id
+ */
+ } while ("0".equals(_sessionId));
+
+ LOG.info("Handling new session, session id: " + _sessionId + ", instance: " + _instanceName
+ + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName + ", zkconnection: "
+ + ((ZkConnection) _zkclient.getConnection()).getZookeeper());
+ }
+
+ void initHandlers(List<CallbackHandler> handlers) {
+ synchronized (this) {
+ if (handlers != null) {
+ for (CallbackHandler handler : handlers) {
+ handler.init();
+ LOG.info("init handler: " + handler.getPath() + ", " + handler.getListener());
+ }
+ }
+ }
+ }
+
+ void resetHandlers() {
+ synchronized (this) {
+ if (_handlers != null) {
+ // get a copy of the list and iterate over the copy list
+ // in case handler.reset() modify the original handler list
+ List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
+ tmpHandlers.addAll(_handlers);
+
+ for (CallbackHandler handler : tmpHandlers) {
+ handler.reset();
+ LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
+ }
+ }
+ }
+ }
+
+ /**
+ * If zk state has changed into Disconnected for _maxDisconnectThreshold times during previous
+ * _timeWindowLengthMs Ms
+ * time window, we think that there are something wrong going on and disconnect the zkHelixManager
+ * from zk.
+ */
+ boolean isFlapping() {
+ if (_disconnectTimeHistory.size() == 0) {
+ return false;
+ }
+ long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
+
+ // Remove disconnect history timestamp that are older than _flappingTimeWindowMs ago
+ while ((_disconnectTimeHistory.get(0) + _flappingTimeWindowMs) < mostRecentTimestamp) {
+ _disconnectTimeHistory.remove(0);
+ }
+ return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
+ }
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ switch (state) {
+ case SyncConnected:
+ ZkConnection zkConnection = (ZkConnection) _zkclient.getConnection();
+ LOG.info("KeeperState: " + state + ", zookeeper:" + zkConnection.getZookeeper());
+ break;
+ case Disconnected:
+ LOG.info("KeeperState:" + state + ", disconnectedSessionId: " + _sessionId + ", instance: "
+ + _instanceName + ", type: " + _instanceType);
+
+ /**
+ * Track the time stamp that the disconnected happens, then check history and see if
+ * we should disconnect the helix-manager
+ */
+ _disconnectTimeHistory.add(System.currentTimeMillis());
+ if (isFlapping()) {
+ LOG.error("instanceName: " + _instanceName + " is flapping. diconnect it. "
+ + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in "
+ + _flappingTimeWindowMs + "ms.");
+ disconnect();
+ }
+ break;
+ case Expired:
+ LOG.info("KeeperState:" + state + ", expiredSessionId: " + _sessionId + ", instance: "
+ + _instanceName + ", type: " + _instanceType);
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public void handleNewSession() throws Exception {
+ waitUntilConnected();
+
+ /**
+ * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session
+ * disconnect if fail to cleanup
+ */
+ stopTimerTasks();
+ if (_leaderElectionHandler != null) {
+ _leaderElectionHandler.reset();
+ }
+ resetHandlers();
+
+ /**
+ * clean up write-through cache
+ */
+ _baseDataAccessor.reset();
+
+ /**
+ * from here on, we are dealing with new session
+ */
+ if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
+ throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName);
+ }
+
+ switch (_instanceType) {
+ case PARTICIPANT:
+ handleNewSessionAsParticipant();
+ break;
+ case CONTROLLER:
+ handleNewSessionAsController();
+ break;
+ case CONTROLLER_PARTICIPANT:
+ handleNewSessionAsParticipant();
+ handleNewSessionAsController();
+ break;
+ case ADMINISTRATOR:
+ case SPECTATOR:
+ default:
+ break;
+ }
+
+ startTimerTasks();
+
+ /**
+ * init handlers
+ * ok to init message handler and data-accessor twice
+ * the second init will be skipped (see CallbackHandler)
+ */
+ initHandlers(_handlers);
+ }
+
+ void handleNewSessionAsParticipant() throws Exception {
+ /**
+ * auto-join
+ */
+ ParticipantManagerHelper participantHelper =
+ new ParticipantManagerHelper(this, _zkclient, _sessionTimeout, _liveInstanceInfoProvider);
+ participantHelper.joinCluster();
+
+ /**
+ * Invoke PreConnectCallbacks
+ */
+ for (PreConnectCallback callback : _preConnectCallbacks) {
+ callback.onPreConnect();
+ }
+
+ participantHelper.createLiveInstance();
+ participantHelper.carryOverPreviousCurrentState();
+
+ /**
+ * setup message listener
+ */
+ participantHelper.setupMsgHandler();
+
+ /**
+ * start health check timer task
+ */
+ participantHelper.createHealthCheckPath();
+ }
+
+ void handleNewSessionAsController() {
+ if (_leaderElectionHandler != null) {
+ _leaderElectionHandler.init();
+ } else {
+ _leaderElectionHandler =
+ new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
+ new DistributedLeaderElection(this, _controller, _controllerTimerTasks),
+ new EventType[] {
+ EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+ }, ChangeType.CONTROLLER);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
index 5865863..755ca52 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
@@ -143,8 +143,7 @@ public class ZkAsyncCallbacks {
}
}
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Interrupted waiting for success", e);
}
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java
deleted file mode 100644
index e11444b..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-public class ZkStateChangeListener implements IZkStateListener {
- private volatile boolean _isConnected;
- private volatile boolean _hasSessionExpired;
- private final ZKHelixManager _zkHelixManager;
-
- // Keep track of timestamps that zk State has become Disconnected
- // If in a _timeWindowLengthMs window zk State has become Disconnected
- // for more than_maxDisconnectThreshold times disconnect the zkHelixManager
- List<Long> _disconnectTimeHistory = new LinkedList<Long>();
- int _timeWindowLengthMs;
- int _maxDisconnectThreshold;
-
- private static Logger logger = Logger.getLogger(ZkStateChangeListener.class);
-
- public ZkStateChangeListener(ZKHelixManager zkHelixManager, int timeWindowLengthMs,
- int maxDisconnectThreshold) {
- this._zkHelixManager = zkHelixManager;
- _timeWindowLengthMs = timeWindowLengthMs;
- // _maxDisconnectThreshold min value is 1.
- // We don't want to disconnect from zk for the first time zkState become Disconnected
- _maxDisconnectThreshold = maxDisconnectThreshold > 0 ? maxDisconnectThreshold : 1;
- }
-
- @Override
- public void handleNewSession() {
- // TODO:bug in zkclient .
- // zkclient does not invoke handleStateChanged when a session expires but
- // directly invokes handleNewSession
- _isConnected = true;
- _hasSessionExpired = false;
- _zkHelixManager.handleNewSession();
- }
-
- @Override
- public void handleStateChanged(KeeperState keeperState) throws Exception {
- switch (keeperState) {
- case SyncConnected:
- ZkConnection zkConnection = ((ZkConnection) _zkHelixManager._zkClient.getConnection());
- logger.info("KeeperState: " + keeperState + ", zookeeper:" + zkConnection.getZookeeper());
- _isConnected = true;
- break;
- case Disconnected:
- logger.info("KeeperState:" + keeperState + ", disconnectedSessionId: "
- + _zkHelixManager._sessionId + ", instance: " + _zkHelixManager.getInstanceName()
- + ", type: " + _zkHelixManager.getInstanceType());
-
- _isConnected = false;
- // Track the time stamp that the disconnected happens, then check history and see if
- // we should disconnect the _zkHelixManager
- _disconnectTimeHistory.add(System.currentTimeMillis());
- if (isFlapping()) {
- logger.error("isFlapping() returns true, so disconnect the helix manager. "
- + _zkHelixManager.getInstanceName() + " " + _maxDisconnectThreshold
- + " disconnects in " + _timeWindowLengthMs + " Ms.");
- _zkHelixManager.disconnectInternal();
- }
- break;
- case Expired:
- logger.info("KeeperState:" + keeperState + ", expiredSessionId: "
- + _zkHelixManager._sessionId + ", instance: " + _zkHelixManager.getInstanceName()
- + ", type: " + _zkHelixManager.getInstanceType());
-
- _isConnected = false;
- _hasSessionExpired = true;
- break;
- }
- }
-
- boolean isConnected() {
- return _isConnected;
- }
-
- void disconnect() {
- _isConnected = false;
- }
-
- boolean hasSessionExpired() {
- return _hasSessionExpired;
- }
-
- /**
- * If zk state has changed into Disconnected for _maxDisconnectThreshold times during previous
- * _timeWindowLengthMs Ms
- * time window, we think that there are something wrong going on and disconnect the zkHelixManager
- * from zk.
- */
- boolean isFlapping() {
- if (_disconnectTimeHistory.size() == 0) {
- return false;
- }
- long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
- // Remove disconnect history timestamp that are older than _timeWindowLengthMs ago
- while ((_disconnectTimeHistory.get(0) + _timeWindowLengthMs) < mostRecentTimestamp) {
- _disconnectTimeHistory.remove(0);
- }
- return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index 73b69a8..422d35e 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -117,9 +117,6 @@ public class DefaultMessagingService implements ClusterMessagingService {
Builder keyBuilder = accessor.keyBuilder();
if (receiverType == InstanceType.CONTROLLER) {
- // _manager.getDataAccessor().setProperty(PropertyType.MESSAGES_CONTROLLER,
- // tempMessage,
- // tempMessage.getId());
accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()), tempMessage);
}
@@ -137,6 +134,7 @@ public class DefaultMessagingService implements ClusterMessagingService {
return totalMessageCount;
}
+ @Override
public Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
final Message message) {
Map<InstanceType, List<Message>> messagesToSendMap = new HashMap<InstanceType, List<Message>>();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 8672e7e..6da3dc2 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -32,7 +32,6 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
/**
@@ -94,20 +93,18 @@ public class ResourceAssignment extends HelixProperty {
/**
* Get the participant, state pairs for a partition
* @param partition the Partition to look up
- * @return immutable map of (participant id, state)
+ * @return map of (participant id, state)
*/
public Map<ParticipantId, State> getReplicaMap(PartitionId partitionId) {
Map<String, String> rawReplicaMap = _record.getMapField(partitionId.stringify());
- if (rawReplicaMap == null) {
- return Collections.emptyMap();
- }
- ImmutableMap.Builder<ParticipantId, State> builder =
- new ImmutableMap.Builder<ParticipantId, State>();
- for (String participantName : rawReplicaMap.keySet()) {
- builder.put(ParticipantId.from(participantName),
- State.from(rawReplicaMap.get(participantName)));
+ Map<ParticipantId, State> replicaMap = Maps.newHashMap();
+ if (rawReplicaMap != null) {
+ for (String participantName : rawReplicaMap.keySet()) {
+ replicaMap.put(ParticipantId.from(participantName),
+ State.from(rawReplicaMap.get(participantName)));
+ }
}
- return builder.build();
+ return replicaMap;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 0558764..919eb00 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -472,6 +472,12 @@ public class Mocks {
return _properties;
}
+ @Override
+ public void addControllerMessageListener(MessageListener listener) {
+ // TODO Auto-generated method stub
+
+ }
+
}
public static class MockAccessor implements HelixDataAccessor // DataAccessor
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 4bdd423..871d717 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -50,7 +50,6 @@ import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.integration.manager.ZkTestManager;
import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -65,8 +64,6 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
-import org.apache.helix.participant.DistClusterControllerStateModelFactory;
-import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.ZNode;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.util.ZKClientPool;
@@ -131,7 +128,7 @@ public class TestHelper {
try {
zkClient.deleteRecursive(rootNamespace);
} catch (Exception e) {
- LOG.error("fail to deleteRecursive path:" + rootNamespace + "\nexception:" + e);
+ LOG.error("fail to deleteRecursive path:" + rootNamespace, e);
}
}
}
@@ -152,90 +149,6 @@ public class TestHelper {
}
}
- public static StartCMResult startDummyProcess(final String zkAddr, final String clusterName,
- final String instanceName) throws Exception {
- StartCMResult result = new StartCMResult();
- ZkHelixTestManager manager = null;
- manager = new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
- result._manager = manager;
- Thread thread = new Thread(new DummyProcessThread(manager, instanceName));
- result._thread = thread;
- thread.start();
-
- return result;
- }
-
- private static ZkHelixTestManager startHelixController(final String zkConnectString,
- final String clusterName, final String controllerName, final String controllerMode) {
- ZkHelixTestManager manager = null;
- try {
- if (controllerMode.equalsIgnoreCase(HelixControllerMain.STANDALONE)) {
- manager =
- new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER,
- zkConnectString);
- manager.connect();
- } else if (controllerMode.equalsIgnoreCase(HelixControllerMain.DISTRIBUTED)) {
- manager =
- new ZkHelixTestManager(clusterName, controllerName,
- InstanceType.CONTROLLER_PARTICIPANT, zkConnectString);
-
- DistClusterControllerStateModelFactory stateModelFactory =
- new DistClusterControllerStateModelFactory(zkConnectString);
-
- StateMachineEngine stateMach = manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
- manager.connect();
- } else {
- LOG.error("cluster controller mode:" + controllerMode + " NOT supported");
- }
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- return manager;
- }
-
- // TODO refactor this
- public static StartCMResult startController(final String clusterName,
- final String controllerName, final String zkConnectString, final String controllerMode)
- throws Exception {
- final StartCMResult result = new StartCMResult();
- final ZkHelixTestManager manager =
- startHelixController(zkConnectString, clusterName, controllerName, controllerMode);
- result._manager = manager;
-
- Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- // ClusterManager manager = null;
-
- try {
-
- Thread.currentThread().join();
- } catch (InterruptedException e) {
- String msg =
- "controller:" + controllerName + ", " + Thread.currentThread().getName()
- + " interrupted";
- LOG.info(msg);
- // System.err.println(msg);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
-
- thread.start();
- result._thread = thread;
- return result;
- }
-
- public static class StartCMResult {
- public Thread _thread;
- public ZkHelixTestManager _manager;
-
- }
-
public static void setupEmptyCluster(ZkClient zkClient, String clusterName) {
ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
admin.addCluster(clusterName, true);
@@ -251,11 +164,6 @@ public class TestHelper {
return set;
}
- // public static void verifyWithTimeout(String verifierName, Object... args)
- // {
- // verifyWithTimeout(verifierName, 30 * 1000, args);
- // }
-
/**
* generic method for verification with a timeout
* @param verifierName
@@ -292,8 +200,7 @@ public class TestHelper {
Assert.assertTrue(result);
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Exception in verify: " + verifierName, e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java b/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
index 37c68a2..0decbd8 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -34,6 +35,8 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestZkClientWrapper extends ZkUnitTestBase {
+ private static Logger LOG = Logger.getLogger(TestZkClientWrapper.class);
+
ZkClient _zkClient;
@BeforeClass
@@ -69,7 +72,7 @@ public class TestZkClientWrapper extends ZkUnitTestBase {
}
@Test()
- void testSessioExpire() {
+ void testSessioExpire() throws Exception {
IZkStateListener listener = new IZkStateListener() {
@Override
@@ -82,31 +85,27 @@ public class TestZkClientWrapper extends ZkUnitTestBase {
System.out.println("In Old connection New session");
}
};
+
_zkClient.subscribeStateChanges(listener);
ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
ZooKeeper zookeeper = connection.getZookeeper();
System.out.println("old sessionId= " + zookeeper.getSessionId());
- try {
- Watcher watcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- System.out.println("In New connection In process event:" + event);
- }
- };
- ZooKeeper newZookeeper =
- new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
- zookeeper.getSessionId(), zookeeper.getSessionPasswd());
- Thread.sleep(3000);
- System.out.println("New sessionId= " + newZookeeper.getSessionId());
- Thread.sleep(3000);
- newZookeeper.close();
- Thread.sleep(10000);
- connection = ((ZkConnection) _zkClient.getConnection());
- zookeeper = connection.getZookeeper();
- System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ System.out.println("In New connection In process event:" + event);
+ }
+ };
+ ZooKeeper newZookeeper =
+ new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
+ zookeeper.getSessionId(), zookeeper.getSessionPasswd());
+ Thread.sleep(3000);
+ System.out.println("New sessionId= " + newZookeeper.getSessionId());
+ Thread.sleep(3000);
+ newZookeeper.close();
+ Thread.sleep(10000);
+ connection = ((ZkConnection) _zkClient.getConnection());
+ zookeeper = connection.getZookeeper();
+ System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
index 9c71fdd..fdf6e72 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
@@ -215,8 +215,7 @@ public class TestZnodeModify extends ZkUnitTestBase {
zkClient.createPersistent(pathChild1, true);
zkClient.writeData(pathChild1, record);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ logger.error("Interrupted sleep", e);
}
}
}.start();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/ZkHelixTestManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkHelixTestManager.java b/helix-core/src/test/java/org/apache/helix/ZkHelixTestManager.java
deleted file mode 100644
index b660a1d..0000000
--- a/helix-core/src/test/java/org/apache/helix/ZkHelixTestManager.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.helix;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkClient;
-
-// ZkHelixManager used for test only. expose more class members
-public class ZkHelixTestManager extends ZKHelixManager {
-
- public ZkHelixTestManager(String clusterName, String instanceName, InstanceType instanceType,
- String zkConnectString) throws Exception {
- super(clusterName, instanceName, instanceType, zkConnectString);
- // TODO Auto-generated constructor stub
- }
-
- public ZkClient getZkClient() {
- return _zkClient;
- }
-
- public List<CallbackHandler> getHandlers() {
- return _handlers;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 369ad68..0d597f6 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -39,10 +39,10 @@ import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -59,8 +59,8 @@ public class TestNewStages extends ZkUnitTestBase {
final int n = 2;
final int p = 8;
final int r = 2;
- MockParticipant[] _participants = new MockParticipant[n];
- ClusterController _controller;
+ MockParticipantManager[] _participants = new MockParticipantManager[n];
+ ClusterControllerManager _controller;
ClusterId _clusterId;
HelixDataAccessor _dataAccessor;
@@ -217,14 +217,14 @@ public class TestNewStages extends ZkUnitTestBase {
r, // replicas
"MasterSlave", true); // do rebalance
- _controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ _controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
_controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- _participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
_participants[i].syncStart();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index 26da8ee..fc9b7d5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -254,4 +254,10 @@ public class DummyClusterManager implements HelixManager {
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public void addControllerMessageListener(MessageListener listener) {
+ // TODO Auto-generated method stub
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index ba61361..dd5f441 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -31,9 +32,11 @@ import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.State;
import org.apache.helix.api.accessor.ClusterAccessor;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.controller.pipeline.Pipeline;
@@ -43,8 +46,10 @@ import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.ResourceAssignment;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -62,7 +67,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
// ideal state: node0 is MASTER, node1 is SLAVE
// replica=2 means 1 master and 1 slave
- setupIdealState(clusterName, new int[] {
+ List<IdealState> idealStates = setupIdealState(clusterName, new int[] {
0, 1
}, new String[] {
"TestDB"
@@ -78,6 +83,10 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
ClusterEvent event = new ClusterEvent("testEvent");
event.addAttribute("helixmanager", manager);
+ // get an empty best possible output for the partitions
+ BestPossibleStateOutput bestPossOutput = getEmptyBestPossibleStateOutput(idealStates);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossOutput);
+
MessageThrottleStage throttleStage = new MessageThrottleStage();
try {
runStage(event, throttleStage);
@@ -138,7 +147,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
// ideal state: node0 is MASTER, node1 is SLAVE
// replica=2 means 1 master and 1 slave
- setupIdealState(clusterName, new int[] {
+ List<IdealState> idealStates = setupIdealState(clusterName, new int[] {
0, 1
}, new String[] {
"TestDB"
@@ -270,6 +279,10 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
ClusterEvent event = new ClusterEvent("testEvent");
event.addAttribute("helixmanager", manager);
+ // get an empty best possible output for the partitions
+ BestPossibleStateOutput bestPossOutput = getEmptyBestPossibleStateOutput(idealStates);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossOutput);
+
Pipeline dataRefresh = new Pipeline();
dataRefresh.addStage(new ReadClusterDataStage());
runPipeline(event, dataRefresh);
@@ -329,5 +342,18 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
return false;
}
+ private BestPossibleStateOutput getEmptyBestPossibleStateOutput(List<IdealState> idealStates) {
+ BestPossibleStateOutput output = new BestPossibleStateOutput();
+ for (IdealState idealState : idealStates) {
+ ResourceId resourceId = idealState.getResourceId();
+ ResourceAssignment assignment = new ResourceAssignment(resourceId);
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+ Map<ParticipantId, State> emptyMap = Collections.emptyMap();
+ assignment.addReplicaMap(partitionId, emptyMap);
+ }
+ output.setResourceAssignment(resourceId, assignment);
+ }
+ return output;
+ }
// add pending message test case
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
index 8e7b85a..2dbf5f6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
@@ -27,8 +27,8 @@ import org.testng.annotations.Test;
public class TestParseInfoFromAlert extends ZkStandAloneCMTestBase {
@Test
public void TestParse() {
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+
+ HelixManager manager = _controller;
String instanceName =
StatsAggregationStage.parseInstanceName("localhost_12918.TestStat@DB=123.latency", manager);
@@ -50,6 +50,7 @@ public class TestParseInfoFromAlert extends ZkStandAloneCMTestBase {
String partitionName =
StatsAggregationStage.parsePartitionName(
"localhost_12918.TestStat@DB=TestDB;Partition=TestDB_22.latency", manager);
+
Assert.assertTrue(partitionName.equals("TestDB_22"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 4129f66..2e17ad3 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -37,6 +37,16 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.controller.stages.TaskAssignmentStage;
+import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -138,6 +148,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ HelixManager manager = new DummyClusterManager(clusterName, accessor);
+ ClusterEvent event = new ClusterEvent("testEvent");
+
final String resourceName = "testResource_dup";
String[] resourceGroups = new String[] {
resourceName
@@ -161,8 +174,10 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
clusterAccessor.initClusterStructure();
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
// round1: controller sends O->S to both node0 and node1
Thread.sleep(1000);