You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/08/18 21:28:35 UTC
git commit: [HELIX-499] Controller should listen for all config
changes
Repository: helix
Updated Branches:
refs/heads/master 325fe8b14 -> a298f23de
[HELIX-499] Controller should listen for all config changes
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a298f23d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a298f23d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a298f23d
Branch: refs/heads/master
Commit: a298f23defb420aeee1a8d10b7f626433df97576
Parents: 325fe8b
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Aug 18 12:24:34 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Mon Aug 18 12:24:34 2014 -0700
----------------------------------------------------------------------
.../main/java/org/apache/helix/api/Cluster.java | 16 +-
.../helix/api/accessor/ClusterAccessor.java | 3 +-
.../controller/GenericHelixController.java | 54 ++-
.../helix/controller/HelixControllerMain.java | 3 +
.../controller/stages/ClusterDataCache.java | 24 +-
.../manager/zk/ControllerManagerHelper.java | 5 +
.../helix/manager/zk/ZkCallbackHandler.java | 4 +
.../helix/manager/zk/ZkHelixConnection.java | 9 +-
.../helix/manager/zk/ZkHelixController.java | 6 +
.../org/apache/helix/task/TaskRebalancer.java | 13 +-
.../java/org/apache/helix/task/TaskUtil.java | 33 ++
.../integration/TestZkCallbackHandlerLeak.java | 35 +-
.../manager/TestZkCallbackHandlerLeak.java | 474 -------------------
13 files changed, 177 insertions(+), 502 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 421ff60..2b59b27 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -34,6 +34,7 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SpectatorId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.context.ControllerContext;
+import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.StateModelDefinition;
@@ -79,6 +80,8 @@ public class Cluster {
private final ClusterConfig _config;
+ private final ClusterDataCache _cache;
+
/**
* construct a cluster
* @param id
@@ -98,7 +101,7 @@ public class Cluster {
ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
Map<StateModelDefId, StateModelDefinition> stateModelMap,
Map<ContextId, ControllerContext> contextMap, UserConfig userConfig, boolean isPaused,
- boolean autoJoinAllowed) {
+ boolean autoJoinAllowed, ClusterDataCache cache) {
// build the config
// Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -140,6 +143,8 @@ public class Cluster {
_contextMap = ImmutableMap.copyOf(contextMap);
+ _cache = cache;
+
// TODO impl this when we persist controllers and spectators on zookeeper
_controllerMap = ImmutableMap.copyOf(controllerMap);
_spectatorMap = Collections.emptyMap();
@@ -286,4 +291,13 @@ public class Cluster {
public ClusterConfig getConfig() {
return _config;
}
+
+ /**
+ * Get a ClusterDataCache object which is a flattened version of the physical properties read to
+ * build this object.
+ * @return ClusterDataCache
+ */
+ public ClusterDataCache getCache() {
+ return _cache;
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index ddf809a..eac2bf8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -244,7 +244,8 @@ public class ClusterAccessor {
// create the cluster snapshot object
return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
- clusterConstraintMap, stateModelMap, contextMap, userConfig, isPaused, autoJoinAllowed);
+ clusterConstraintMap, stateModelMap, contextMap, userConfig, isPaused, autoJoinAllowed,
+ _cache);
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index f1c2583..d36b6f5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -34,6 +34,7 @@ import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
import org.apache.helix.IdealStateChangeListener;
import org.apache.helix.InstanceConfigChangeListener;
import org.apache.helix.LiveInstanceChangeListener;
@@ -41,6 +42,7 @@ import org.apache.helix.MessageListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ScopedConfigChangeListener;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.controller.pipeline.Pipeline;
@@ -62,16 +64,21 @@ import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.controller.stages.ResourceValidationStage;
import org.apache.helix.controller.stages.TaskAssignmentStage;
+import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceConfiguration;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
+import com.google.common.collect.Lists;
+
/**
* Cluster Controllers main goal is to keep the cluster state as close as possible to Ideal State.
* It does this by listening to changes in cluster state and scheduling new tasks to get cluster
@@ -87,7 +94,8 @@ import org.apache.log4j.Logger;
*/
public class GenericHelixController implements IdealStateChangeListener,
LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
- ExternalViewChangeListener, ControllerChangeListener, InstanceConfigChangeListener {
+ ExternalViewChangeListener, ControllerChangeListener, InstanceConfigChangeListener,
+ ScopedConfigChangeListener {
private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
volatile boolean init = false;
private final PipelineRegistry _registry;
@@ -213,6 +221,8 @@ public class GenericHelixController implements IdealStateChangeListener,
registry.register("idealStateChange", dataRefresh, rebalancePipeline);
registry.register("currentStateChange", dataRefresh, rebalancePipeline, externalViewPipeline);
registry.register("configChange", dataRefresh, rebalancePipeline);
+ registry.register("instanceConfigChange", dataRefresh, rebalancePipeline);
+ registry.register("resourceConfigChange", dataRefresh, rebalancePipeline);
registry.register("liveInstanceChange", dataRefresh, rebalancePipeline, externalViewPipeline);
registry.register("messageChange", dataRefresh, rebalancePipeline);
@@ -443,7 +453,7 @@ public class GenericHelixController implements IdealStateChangeListener,
}
_cache.setInstanceConfigs(configs);
- ClusterEvent event = new ClusterEvent("configChange");
+ ClusterEvent event = new ClusterEvent("instanceConfigChange");
event.addAttribute("changeContext", changeContext);
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("eventData", configs);
@@ -452,6 +462,46 @@ public class GenericHelixController implements IdealStateChangeListener,
}
@Override
+ public void onConfigChange(List<HelixProperty> configs, NotificationContext context) {
+ logger.info("START: GenericClusterController.onConfigChange()");
+ if (context == null || context.getType() != Type.CALLBACK) {
+ _cache.requireFullRefresh();
+ }
+
+ if (configs == null) {
+ configs = Collections.emptyList();
+ }
+
+ String eventName;
+ String path = context.getPathChanged();
+ if (path.contains(ConfigScopeProperty.RESOURCE.toString())) {
+ List<ResourceConfiguration> resourceConfigs = Lists.newArrayList();
+ for (HelixProperty property : configs) {
+ resourceConfigs.add(new ResourceConfiguration(property.getRecord()));
+ }
+ _cache.setResourceConfigs(resourceConfigs);
+ eventName = "resourceConfigChange";
+ } else if (path.contains(ConfigScopeProperty.CONSTRAINT.toString())) {
+ List<ClusterConstraints> constraints = Lists.newArrayList();
+ for (HelixProperty property : configs) {
+ constraints.add(new ClusterConstraints(property.getRecord()));
+ }
+ _cache.setConstraints(constraints);
+ eventName = "constraintChange";
+ } else {
+ logger.warn("Controller received event for unsupported path: " + path);
+ eventName = "configChange";
+ }
+
+ ClusterEvent event = new ClusterEvent(eventName);
+ event.addAttribute("changeContext", context);
+ event.addAttribute("helixmanager", context.getManager());
+ event.addAttribute("eventData", configs);
+ _eventQueue.put(event);
+ logger.info("END: GenericClusterController.onConfigChange()");
+ }
+
+ @Override
public void onControllerChange(NotificationContext changeContext) {
logger.info("START: GenericClusterController.onControllerChange()");
_cache.requireFullRefresh();
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index ca540c5..b652f35 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -49,6 +49,7 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.HelixManagerShutdownHook;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.participant.DistClusterControllerStateModelFactory;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.log4j.Logger;
@@ -134,6 +135,8 @@ public class HelixControllerMain {
GenericHelixController controller) {
try {
manager.addInstanceConfigChangeListener(controller);
+ manager.addConfigChangeListener(controller, ConfigScopeProperty.RESOURCE);
+ manager.addConfigChangeListener(controller, ConfigScopeProperty.CONSTRAINT);
manager.addLiveInstanceChangeListener(controller);
manager.addIdealStateChangeListener(controller);
// no need for controller to listen on external-view
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 877baf2..53ddd19 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -64,10 +64,12 @@ public class ClusterDataCache {
Map<String, InstanceConfig> _instanceConfigMap;
Map<String, InstanceConfig> _instanceConfigCacheMap;
Map<String, ClusterConstraints> _constraintMap;
+ Map<String, ClusterConstraints> _constraintCacheMap;
Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
Map<String, Map<String, Message>> _messageMap;
Map<String, Map<String, String>> _idealStateRuleMap;
Map<String, ResourceConfiguration> _resourceConfigMap;
+ Map<String, ResourceConfiguration> _resourceConfigCacheMap;
Map<String, ControllerContextHolder> _controllerContextMap;
PauseSignal _pause;
LiveInstance _leader;
@@ -97,17 +99,20 @@ public class ClusterDataCache {
_idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates());
_liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
_instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+ _resourceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+ _constraintCacheMap = accessor.getChildValuesMap(keyBuilder.constraints());
}
_idealStateMap = Maps.newHashMap(_idealStateCacheMap);
_liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap);
_instanceConfigMap = Maps.newHashMap(_instanceConfigCacheMap);
+ _resourceConfigMap = Maps.newHashMap(_resourceConfigCacheMap);
+ _constraintMap = Maps.newHashMap(_constraintCacheMap);
for (LiveInstance instance : _liveInstanceMap.values()) {
LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
}
_stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
- _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
Map<String, Map<String, Message>> msgMap = new HashMap<String, Map<String, Message>>();
List<PropertyKey> newMessageKeys = Lists.newLinkedList();
@@ -200,7 +205,6 @@ public class ClusterDataCache {
_currentStateMap = Collections.unmodifiableMap(allCurStateMap);
// New in 0.7: Read more information for the benefit of user-defined rebalancers
- _resourceConfigMap = accessor.getChildValuesMap(keyBuilder.resourceConfigs());
_controllerContextMap = accessor.getChildValuesMap(keyBuilder.controllerContexts());
// Read all single properties together
@@ -370,6 +374,22 @@ public class ClusterDataCache {
_instanceConfigCacheMap = instanceConfigMap;
}
+ public synchronized void setResourceConfigs(List<ResourceConfiguration> resourceConfigs) {
+ Map<String, ResourceConfiguration> resourceConfigMap = Maps.newHashMap();
+ for (ResourceConfiguration resourceConfig : resourceConfigs) {
+ resourceConfigMap.put(resourceConfig.getId(), resourceConfig);
+ }
+ _resourceConfigCacheMap = resourceConfigMap;
+ }
+
+ public synchronized void setConstraints(List<ClusterConstraints> constraints) {
+ Map<String, ClusterConstraints> constraintMap = Maps.newHashMap();
+ for (ClusterConstraints constraint : constraints) {
+ constraintMap.put(constraint.getId(), constraint);
+ }
+ _constraintCacheMap = constraintMap;
+ }
+
/**
* Some partitions might be disabled on specific nodes.
* This method allows one to fetch the set of nodes where a given partition is disabled
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index 623b874..e2def60 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -28,6 +28,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.log4j.Logger;
/**
@@ -73,6 +74,8 @@ public class ControllerManagerHelper {
* setup generic-controller
*/
_manager.addInstanceConfigChangeListener(controller);
+ _manager.addConfigChangeListener(controller, ConfigScopeProperty.RESOURCE);
+ _manager.addConfigChangeListener(controller, ConfigScopeProperty.CONSTRAINT);
_manager.addLiveInstanceChangeListener(controller);
_manager.addIdealStateChangeListener(controller);
// no need for controller to listen on external-view
@@ -92,6 +95,8 @@ public class ControllerManagerHelper {
* reset generic-controller
*/
_manager.removeListener(keyBuilder.instanceConfigs(), controller);
+ _manager.removeListener(keyBuilder.resourceConfigs(), controller);
+ _manager.removeListener(keyBuilder.constraints(), controller);
_manager.removeListener(keyBuilder.liveInstances(), controller);
_manager.removeListener(keyBuilder.idealStates(), controller);
_manager.removeListener(keyBuilder.controller(), controller);
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
index ee420b9..b953e0b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
@@ -332,6 +332,7 @@ public class ZkCallbackHandler implements IZkChildListener, IZkDataListener {
try {
NotificationContext changeContext = new NotificationContext(_manager);
changeContext.setType(NotificationContext.Type.INIT);
+ changeContext.setPathChanged(_path);
invoke(changeContext);
} catch (Exception e) {
String msg = "Exception while invoking init callback for listener:" + _listener;
@@ -346,6 +347,7 @@ public class ZkCallbackHandler implements IZkChildListener, IZkDataListener {
if (dataPath != null && dataPath.startsWith(_path)) {
NotificationContext changeContext = new NotificationContext(_manager);
changeContext.setType(NotificationContext.Type.CALLBACK);
+ changeContext.setPathChanged(_path);
invoke(changeContext);
}
} catch (Exception e) {
@@ -399,6 +401,7 @@ public class ZkCallbackHandler implements IZkChildListener, IZkDataListener {
} else {
changeContext.setType(NotificationContext.Type.CALLBACK);
}
+ changeContext.setPathChanged(_path);
invoke(changeContext);
}
} catch (Exception e) {
@@ -416,6 +419,7 @@ public class ZkCallbackHandler implements IZkChildListener, IZkDataListener {
try {
NotificationContext changeContext = new NotificationContext(_manager);
changeContext.setType(NotificationContext.Type.FINALIZE);
+ changeContext.setPathChanged(_path);
invoke(changeContext);
} catch (Exception e) {
String msg = "Exception while resetting the listener:" + _listener;
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
index 0a9dc94..38332c5 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
@@ -37,7 +37,6 @@ import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixMultiClusterController;
import org.apache.helix.HelixConnection;
import org.apache.helix.HelixConnectionStateListener;
import org.apache.helix.HelixConstants.ChangeType;
@@ -45,6 +44,7 @@ import org.apache.helix.HelixController;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.HelixMultiClusterController;
import org.apache.helix.HelixParticipant;
import org.apache.helix.HelixRole;
import org.apache.helix.IdealStateChangeListener;
@@ -210,7 +210,8 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
}
@Override
- public HelixMultiClusterController createAutoController(ClusterId clusterId, ControllerId controllerId) {
+ public HelixMultiClusterController createAutoController(ClusterId clusterId,
+ ControllerId controllerId) {
return new ZkHelixMultiClusterController(this, clusterId, controllerId);
}
@@ -327,6 +328,8 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
case RESOURCE:
propertyKey = keyBuilder.resourceConfigs();
break;
+ case CONSTRAINT:
+ propertyKey = keyBuilder.constraints();
default:
break;
}
@@ -502,7 +505,7 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
@Override
public ClusterMessagingService createMessagingService(HelixRole role) {
HelixManager manager = new ZKHelixManager(role);
- return new DefaultMessagingService(manager);
+ return new DefaultMessagingService(manager);
}
void addListener(HelixRole role, Object listener, PropertyKey propertyKey, ChangeType changeType,
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
index 0698945..295b69c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -39,6 +39,7 @@ import org.apache.helix.api.id.Id;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.StatusDumpTask;
import org.apache.log4j.Logger;
@@ -226,6 +227,9 @@ public class ZkHelixController implements HelixController {
* setup generic-controller
*/
_connection.addInstanceConfigChangeListener(this, pipeline, _clusterId);
+ _connection.addConfigChangeListener(this, pipeline, _clusterId, ConfigScopeProperty.RESOURCE);
+ _connection.addConfigChangeListener(this, pipeline, _clusterId,
+ ConfigScopeProperty.CONSTRAINT);
_connection.addLiveInstanceChangeListener(this, pipeline, _clusterId);
_connection.addIdealStateChangeListener(this, pipeline, _clusterId);
_connection.addControllerListener(this, pipeline, _clusterId);
@@ -242,6 +246,8 @@ public class ZkHelixController implements HelixController {
* reset generic-controller
*/
_connection.removeListener(this, pipeline, keyBuilder.instanceConfigs());
+ _connection.removeListener(this, pipeline, keyBuilder.resourceConfigs());
+ _connection.removeListener(this, pipeline, keyBuilder.constraints());
_connection.removeListener(this, pipeline, keyBuilder.liveInstances());
_connection.removeListener(this, pipeline, keyBuilder.idealStates());
_connection.removeListener(this, pipeline, keyBuilder.controller());
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 8f861cd..af89944 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -52,6 +52,7 @@ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfiguration;
import org.apache.log4j.Logger;
import com.google.common.base.Joiner;
@@ -124,11 +125,19 @@ public abstract class TaskRebalancer implements HelixRebalancer {
final String resourceName = resource.getId().toString();
// Fetch job configuration
- JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
+ Map<String, ResourceConfiguration> resourceConfigs =
+ clusterData.getCache().getResourceConfigs();
+ JobConfig jobCfg = TaskUtil.getJobCfg(resourceConfigs.get(resourceName));
+ if (jobCfg == null) {
+ return emptyAssignment(resourceName, currStateOutput);
+ }
String workflowResource = jobCfg.getWorkflow();
// Fetch workflow configuration and context
- WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+ WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(resourceConfigs.get(workflowResource));
+ if (workflowCfg == null) {
+ return emptyAssignment(resourceName, currStateOutput);
+ }
WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
// Initialize workflow context if needed
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 17b610d..cca7d76 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -68,6 +68,16 @@ public class TaskUtil {
*/
public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
HelixProperty jobResourceConfig = getResourceConfig(manager, jobResource);
+ return getJobCfg(jobResourceConfig);
+ }
+
+ /**
+ * Parses job resource configurations directly from a property into a {@link JobConfig}.
+ * @param jobResourceConfig the property containing the configuration
+ * @return A {@link JobConfig} object if the property valid configurations for the job, null
+ * otherwise.
+ */
+ public static JobConfig getJobCfg(HelixProperty jobResourceConfig) {
if (jobResourceConfig == null) {
return null;
}
@@ -93,6 +103,29 @@ public class TaskUtil {
*/
public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
+ return getWorkflowCfg(workflowCfg);
+ }
+
+ /**
+ * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
+ * @param workflowResourceConfig the proeprty containing the configurations
+ * @return A {@link WorkflowConfig} object if the property contains valid configurations for the
+ * workflow, null otherwise.
+ */
+ public static WorkflowConfig getWorkflowCfg(HelixProperty workflowResourceConfig) {
+ if (workflowResourceConfig == null) {
+ return null;
+ }
+ return getWorkflowCfg(workflowResourceConfig.getRecord().getSimpleFields());
+ }
+
+ /**
+ * Parses a key-value map into a {@link WorkflowConfig} object.
+ * @param workflowCfg the map of configurations
+ * @return A {@link WorkflowConfig} object if the map contains valid configurations for the
+ * workflow, null otherwise.
+ */
+ private static WorkflowConfig getWorkflowCfg(Map<String, String> workflowCfg) {
if (workflowCfg == null) {
return null;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 9e8fd85..190f739 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -31,8 +31,8 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockController;
+import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.model.CurrentState;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -60,8 +60,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- final MockController controller =
- new MockController(_zkaddr, clusterName, "controller_0");
+ final MockController controller = new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
// start participants
@@ -92,10 +91,10 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
// controller should have 5 + 2n + m + (m+2)n zk-watchers
// where n is number of nodes and m is number of resources
- return watchPaths.size() == (6 + 5 * n);
+ return watchPaths.size() == (8 + 5 * n);
}
}, 500);
- Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers.");
+ Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers.");
// check participant zk-watchers
result = TestHelper.verify(new TestHelper.Verifier() {
@@ -117,8 +116,8 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
TestHelper.printHandlers(participantManagerToExpire, participantManagerToExpire.getHandlers());
int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManagerToExpire.getHandlers().size();
- Assert.assertEquals(controllerHandlerNb, 10,
- "HelixController should have 10 (6+2n) callback handlers for 2 (n) participant");
+ Assert.assertEquals(controllerHandlerNb, 12,
+ "HelixController should have 10 (8+2n) callback handlers for 2 (n) participant");
Assert.assertEquals(particHandlerNb, 1,
"HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
@@ -147,10 +146,10 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
// controller should have 5 + 2n + m + (m+2)n zk-watchers
// where n is number of nodes and m is number of resources
- return watchPaths.size() == (6 + 5 * n);
+ return watchPaths.size() == (8 + 5 * n);
}
}, 500);
- Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry.");
+ Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers after session expiry.");
// check participant zk-watchers
result = TestHelper.verify(new TestHelper.Verifier() {
@@ -205,8 +204,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- final MockController controller =
- new MockController(_zkaddr, clusterName, "controller_0");
+ final MockController controller = new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
// start participants
@@ -244,8 +242,8 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManager.getHandlers().size();
TestHelper.printHandlers(controller, controller.getHandlers());
- Assert.assertEquals(controllerHandlerNb, 10,
- "HelixController should have 10 (6+2n) callback handlers for 2 participant, but was "
+ Assert.assertEquals(controllerHandlerNb, 12,
+ "HelixController should have 12 (6+2n) callback handlers for 2 participant, but was "
+ controllerHandlerNb);
TestHelper.printHandlers(participantManager, participantManager.getHandlers());
Assert.assertEquals(particHandlerNb, 1,
@@ -277,10 +275,10 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
// controller should have 5 + 2n + m + (m+2)n zk-watchers
// where n is number of nodes and m is number of resources
- return watchPaths.size() == (6 + 5 * n);
+ return watchPaths.size() == (8 + 5 * n);
}
}, 500);
- Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry.");
+ Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers after session expiry.");
// check participant zk-watchers
result = TestHelper.verify(new TestHelper.Verifier() {
@@ -301,8 +299,11 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
// printHandlers(controllerManager);
int handlerNb = controller.getHandlers().size();
TestHelper.printHandlers(controller, controller.getHandlers());
- Assert.assertEquals(handlerNb, controllerHandlerNb,
- "controller callback handlers should not increase after participant session expiry");
+ Assert
+ .assertTrue(
+ handlerNb <= controllerHandlerNb,
+ "controller callback handlers should not increase after participant session expiry (expected no more than "
+ + controllerHandlerNb + ", found " + handlerNb + ")");
handlerNb = participantManager.getHandlers().size();
TestHelper.printHandlers(participantManager, participantManager.getHandlers());
Assert.assertEquals(handlerNb, particHandlerNb,
http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
deleted file mode 100644
index 1393231..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
+++ /dev/null
@@ -1,474 +0,0 @@
-package org.apache.helix.integration.manager;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.helix.CurrentStateChangeListener;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZkTestHelper;
-import org.apache.helix.manager.zk.MockParticipant;
-import org.apache.helix.manager.zk.MockController;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.testutil.ZkTestBase;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestZkCallbackHandlerLeak extends ZkTestBase {
- private static Logger LOG = Logger.getLogger(TestZkCallbackHandlerLeak.class);
-
- @Test
- public void testCbHdlrLeakOnParticipantSessionExpiry() throws Exception {
- // Logger.getRootLogger().setLevel(Level.INFO);
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- final int n = 2;
-
- System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 32, // partitions per resource
- n, // number of nodes
- 2, // replicas
- "MasterSlave", true); // do rebalance
-
- // start controller
- final MockController controller = new MockController(_zkaddr, clusterName, "controller");
- controller.connect();
-
- // start participants
- MockParticipant[] participants = new MockParticipant[n];
- for (int i = 0; i < n; i++) {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
- clusterName));
- Assert.assertTrue(result);
-
- // check controller zk-watchers
- result = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(_zkaddr);
- LOG.debug("all watchers: " + watchers);
- Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
- LOG.debug("controller watch paths: " + watchPaths);
-
- // controller should have 5 + 2n + m + (m+2)n zk-watchers
- // where n is number of nodes and m is number of resources
- return watchPaths.size() == (6 + 5 * n);
- }
- }, 500);
- Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers.");
-
- // check participant zk-watchers
- final MockParticipant participantManagerToExpire = participants[0];
- result = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(_zkaddr);
- Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
- LOG.debug("participant watch paths: " + watchPaths);
-
- // participant should have 1 zk-watcher: 1 for MESSAGE
- return watchPaths.size() == 1;
- }
- }, 500);
- Assert.assertTrue(result, "Participant should have 1 zk-watcher.");
-
- // check HelixManager#_handlers
- // printHandlers(controllerManager);
- // printHandlers(participantManagerToExpire);
- int controllerHandlerNb = controller.getHandlers().size();
- int particHandlerNb = participantManagerToExpire.getHandlers().size();
- Assert.assertEquals(controllerHandlerNb, (6 + 2 * n),
- "HelixController should have 10 (5+2n) callback handlers for 2 (n) participant");
- Assert.assertEquals(particHandlerNb, 1,
- "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
-
- // expire the session of participant
- LOG.debug("Expiring participant session...");
- String oldSessionId = participantManagerToExpire.getSessionId();
-
- ZkTestHelper.expireSession(participantManagerToExpire.getZkClient());
- String newSessionId = participantManagerToExpire.getSessionId();
- LOG.debug("Expried participant session. oldSessionId: " + oldSessionId + ", newSessionId: "
- + newSessionId);
-
- result =
- ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
- _zkaddr, clusterName));
- Assert.assertTrue(result);
-
- // check controller zk-watchers
- result = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(_zkaddr);
- Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
- LOG.debug("controller watch paths after session expiry: " + watchPaths);
-
- // controller should have 5 + 2n + m + (m+2)n zk-watchers
- // where n is number of nodes and m is number of resources
- return watchPaths.size() == (6 + 5 * n);
- }
- }, 500);
- Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry.");
-
- // check participant zk-watchers
- result = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(_zkaddr);
- Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
- LOG.debug("participant watch paths after session expiry: " + watchPaths);
-
- // participant should have 1 zk-watcher: 1 for MESSAGE
- return watchPaths.size() == 1;
- }
- }, 500);
- Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
-
- // check handlers
- // printHandlers(controllerManager);
- // printHandlers(participantManagerToExpire);
- int handlerNb = controller.getHandlers().size();
- Assert.assertEquals(handlerNb, controllerHandlerNb,
- "controller callback handlers should not increase after participant session expiry");
- handlerNb = participantManagerToExpire.getHandlers().size();
- Assert.assertEquals(handlerNb, particHandlerNb,
- "participant callback handlers should not increase after participant session expiry");
-
- System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
- }
-
- @Test
- public void testCbHdlrLeakOnControllerSessionExpiry() throws Exception {
- // Logger.getRootLogger().setLevel(Level.INFO);
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- final int n = 2;
-
- System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 32, // partitions per resource
- n, // number of nodes
- 2, // replicas
- "MasterSlave", true); // do rebalance
-
- final MockController controller = new MockController(_zkaddr, clusterName, "controller");
- controller.syncStart();
-
- // start participants
- MockParticipant[] participants = new MockParticipant[n];
- for (int i = 0; i < n; i++) {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
- clusterName));
- Assert.assertTrue(result);
-
- // wait until we get all the listeners registered
- final MockParticipant participantManager = participants[0];
- result = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- int controllerHandlerNb = controller.getHandlers().size();
- int particHandlerNb = participantManager.getHandlers().size();
- if (controllerHandlerNb == 10 && particHandlerNb == 2)
- return true;
- else
- return false;
- }
- }, 1000);
-
- int controllerHandlerNb = controller.getHandlers().size();
- int particHandlerNb = participantManager.getHandlers().size();
- TestHelper.printHandlers(controller, controller.getHandlers());
- TestHelper.printHandlers(participantManager, participantManager.getHandlers());
- Assert.assertEquals(controllerHandlerNb, (6 + 2 * n),
- "HelixController should have 10 (6+2n) callback handlers for 2 participant, but was "
- + controllerHandlerNb);
- Assert.assertEquals(particHandlerNb, 1,
- "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
- + particHandlerNb);
-
- // expire controller
- LOG.debug("Expiring controller session...");
- String oldSessionId = controller.getSessionId();
-
- ZkTestHelper.expireSession(controller.getZkClient());
- String newSessionId = controller.getSessionId();
- LOG.debug("Expired controller session. oldSessionId: " + oldSessionId + ", newSessionId: "
- + newSessionId);
-
- result =
- ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
- _zkaddr, clusterName));
- Assert.assertTrue(result);
-
- // check controller zk-watchers
- result = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(_zkaddr);
- Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
- // System.out.println("controller watch paths after session expiry: " +
- // watchPaths);
-
- // controller should have 5 + 2n + m + (m+2)n zk-watchers
- // where n is number of nodes and m is number of resources
- return watchPaths.size() == (6 + 5 * n);
- }
- }, 500);
- Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry.");
-
- // check participant zk-watchers
- result = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(_zkaddr);
- Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId());
- LOG.debug("participant watch paths after session expiry: " + watchPaths);
-
- // participant should have 1 zk-watcher: 1 for MESSAGE
- return watchPaths.size() == 1;
- }
- }, 500);
- Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
-
- // check HelixManager#_handlers
- // printHandlers(controllerManager);
- int handlerNb = controller.getHandlers().size();
- TestHelper.printHandlers(controller, controller.getHandlers());
- Assert.assertEquals(handlerNb, controllerHandlerNb,
- "controller callback handlers should not increase after participant session expiry");
- handlerNb = participantManager.getHandlers().size();
- TestHelper.printHandlers(participantManager, participantManager.getHandlers());
- Assert.assertEquals(handlerNb, particHandlerNb,
- "participant callback handlers should not increase after participant session expiry");
-
- System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
- }
-
- @Test
- public void testRemoveUserCbHdlrOnPathRemoval() throws Exception {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- final int n = 3;
- final String zkAddr = _zkaddr;
- System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 1, // resource
- 32, // partitions
- n, // nodes
- 2, // replicas
- "MasterSlave", true);
-
- final MockController controller = new MockController(zkAddr, clusterName, "controller");
- controller.syncStart();
-
- MockParticipant[] participants = new MockParticipant[n];
- for (int i = 0; i < n; i++) {
- String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(zkAddr, clusterName, instanceName);
- participants[i].syncStart();
-
- // register a controller listener on participant_0
- if (i == 0) {
- MockParticipant manager = participants[0];
- manager.addCurrentStateChangeListener(new CurrentStateChangeListener() {
- @Override
- public void onStateChange(String instanceName, List<CurrentState> statesInfo,
- NotificationContext changeContext) {
- // To change body of implemented methods use File | Settings | File Templates.
- // System.out.println(instanceName + " on current-state change, type: " +
- // changeContext.getType());
- }
- }, manager.getInstanceName(), manager.getSessionId());
- }
- }
-
- Boolean result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
- clusterName));
- Assert.assertTrue(result);
-
- MockParticipant participantToExpire = participants[0];
- String oldSessionId = participantToExpire.getSessionId();
- PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
-
- // check manager#hanlders
- Assert.assertEquals(participantToExpire.getHandlers().size(), 2,
- "Should have 2 handlers: CURRENTSTATE/{sessionId}, and MESSAGES");
-
- // check zkclient#listeners
- Map<String, Set<IZkDataListener>> dataListeners =
- ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
- Map<String, Set<IZkChildListener>> childListeners =
- ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
- // printZkListeners(participantToExpire.getZkClient());
- Assert.assertEquals(dataListeners.size(), 1,
- "Should have 1 path (CURRENTSTATE/{sessionId}/TestDB0) which has 1 data-listeners");
- String path =
- keyBuilder.currentState(participantToExpire.getInstanceName(), oldSessionId, "TestDB0")
- .getPath();
- Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 data-listeners on path: "
- + path);
- Assert
- .assertEquals(childListeners.size(), 2,
- "Should have 2 paths (CURRENTSTATE/{sessionId}, and MESSAGES) each of which has 1 child-listener");
- path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
- Assert.assertEquals(childListeners.get(path).size(), 1,
- "Should have 1 child-listener on path: " + path);
- path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
- Assert.assertEquals(childListeners.get(path).size(), 1,
- "Should have 1 child-listener on path: " + path);
- path = keyBuilder.controller().getPath();
- Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
-
- // check zookeeper#watches on client side
- Map<String, List<String>> watchPaths =
- ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
- LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
- Assert
- .assertEquals(watchPaths.get("dataWatches").size(), 3,
- "Should have 3 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
- "Should have 2 child-watches: MESSAGES, and CURRENTSTATE/{sessionId}");
-
- // expire localhost_12918
- System.out.println("Expire participant: " + participantToExpire.getInstanceName()
- + ", session: " + participantToExpire.getSessionId());
- ZkTestHelper.expireSession(participantToExpire.getZkClient());
- String newSessionId = participantToExpire.getSessionId();
- System.out.println(participantToExpire.getInstanceName() + " oldSessionId: " + oldSessionId
- + ", newSessionId: " + newSessionId);
- result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
- clusterName));
- Assert.assertTrue(result);
-
- // check manager#hanlders
- Assert
- .assertEquals(
- participantToExpire.getHandlers().size(),
- 1,
- "Should have 1 handler: MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
-
- // check zkclient#listeners
- dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
- childListeners = ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
- // printZkListeners(participantToExpire.getZkClient());
- Assert.assertTrue(dataListeners.isEmpty(), "Should have no data-listeners");
- Assert
- .assertEquals(
- childListeners.size(),
- 2,
- "Should have 2 paths (CURRENTSTATE/{oldSessionId}, and MESSAGES). "
- + "MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
- path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
- Assert.assertEquals(childListeners.get(path).size(), 0,
- "Should have no child-listener on path: " + path);
- path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
- Assert.assertEquals(childListeners.get(path).size(), 1,
- "Should have 1 child-listener on path: " + path);
- path = keyBuilder.controller().getPath();
- Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
-
- // check zookeeper#watches on client side
- watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
- LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
- Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
- "Should have 1 data-watch: MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
- "Should have 1 child-watch: MESSAGES");
- Assert
- .assertEquals(watchPaths.get("existWatches").size(), 2,
- "Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
-
- // another session expiry on localhost_12918 should clear the two exist-watches on
- // CURRENTSTATE/{oldSessionId}
- System.out.println("Expire participant: " + participantToExpire.getInstanceName()
- + ", session: " + participantToExpire.getSessionId());
- ZkTestHelper.expireSession(participantToExpire.getZkClient());
- result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
- clusterName));
- Assert.assertTrue(result);
-
- // check zookeeper#watches on client side
- watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
- LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
- Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
- "Should have 1 data-watch: MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
- "Should have 1 child-watch: MESSAGES");
- Assert
- .assertEquals(
- watchPaths.get("existWatches").size(),
- 0,
- "Should have no exist-watches. exist-watches on CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be cleared during handleNewSession");
-
- // Thread.sleep(1000);
- System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
- }
-}