You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/02/04 02:50:14 UTC
git commit: HELIX-40: fix zkclient subscribe path leaking and zk
callback-handler leaking in case of session expiry
Updated Branches:
refs/heads/master c52baf615 -> 03423e624
HELIX-40: fix zkclient subscribe path leaking and zk callback-handler leaking in case of session expiry
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/03423e62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/03423e62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/03423e62
Branch: refs/heads/master
Commit: 03423e6248fb970251c4443ddba94fac79076d93
Parents: c52baf6
Author: zzhang <zz...@uci.edu>
Authored: Sun Feb 3 17:50:00 2013 -0800
Committer: zzhang <zz...@uci.edu>
Committed: Sun Feb 3 17:50:00 2013 -0800
----------------------------------------------------------------------
.../main/java/org/apache/helix/HelixManager.java | 2 +-
.../helix/controller/GenericHelixController.java | 135 ++++++-----
.../helix/controller/HelixControllerMain.java | 3 +-
.../apache/helix/manager/zk/CallbackHandler.java | 10 +-
.../apache/helix/manager/zk/ZKHelixManager.java | 127 +++++-----
.../participant/GenericLeaderStandbyModel.java | 27 ++-
.../src/test/java/org/apache/helix/Mocks.java | 2 +-
.../test/java/org/apache/helix/ZkTestHelper.java | 3 +-
.../controller/stages/DummyClusterManager.java | 3 +-
.../integration/TestZkCallbackHandlerLeak.java | 197 +++++++++++++++
.../helix/manager/zk/TestLiveInstanceBounce.java | 11 +-
.../helix/manager/zk/TestZkClusterManager.java | 4 +-
.../helix/mock/controller/ClusterController.java | 18 +-
.../helix/mock/participant/MockParticipant.java | 2 +-
.../helix/participant/MockZKHelixManager.java | 3 +-
15 files changed, 389 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/main/java/org/apache/helix/HelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 4d3adc8..7c91cd6 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -176,7 +176,7 @@ public interface HelixManager
* @param listener
* @return
*/
- boolean removeListener(Object listener);
+ boolean removeListener(PropertyKey key, Object listener);
/**
* Return the client to perform read/write operations on the cluster data
* store
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index ba22b63..deb2bdb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -21,11 +21,14 @@ package org.apache.helix.controller;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ConfigChangeListener;
@@ -101,18 +104,9 @@ public class GenericHelixController implements
volatile boolean init = false;
private final PipelineRegistry _registry;
- /**
- * Since instance current state is per-session-id, we need to track the session-ids of
- * the current states that the ClusterController is observing. this set contains all the
- * session ids that we add currentState listener
- */
- private final Set<String> _instanceCurrentStateChangeSubscriptionSessionIds;
-
- /**
- * this set contains all the instance names that we add message listener
- */
- private final Set<String> _instanceSubscriptionNames;
-
+ final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
+ final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
+
ClusterStatusMonitor _clusterStatusMonitor;
@@ -264,10 +258,8 @@ public class GenericHelixController implements
{
_paused = false;
_registry = registry;
- _instanceCurrentStateChangeSubscriptionSessionIds =
- new ConcurrentSkipListSet<String>();
- _instanceSubscriptionNames = new ConcurrentSkipListSet<String>();
- // _externalViewGenerator = new ExternalViewGenerator();
+ _lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
+ _lastSeenSessions = new AtomicReference<Map<String,LiveInstance>>();
}
/**
@@ -563,55 +555,72 @@ public class GenericHelixController implements
protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
NotificationContext changeContext)
{
- for (LiveInstance instance : liveInstances)
- {
- String instanceName = instance.getId();
- String clientSessionId = instance.getSessionId();
- HelixManager manager = changeContext.getManager();
- // _instanceCurrentStateChangeSubscriptionSessionIds contains all the sessionIds
- // that we've added a currentState listener
- if (!_instanceCurrentStateChangeSubscriptionSessionIds.contains(clientSessionId))
- {
- try
- {
- manager.addCurrentStateChangeListener(this, instanceName, clientSessionId);
- _instanceCurrentStateChangeSubscriptionSessionIds.add(clientSessionId);
- logger.info("Observing client session id: " + clientSessionId);
- }
- catch (Exception e)
- {
- logger.error("Exception adding current state and message listener for instance:"
- + instanceName,
- e);
- }
- }
-
- // _instanceSubscriptionNames contains all the instanceNames that we've added a
- // message listener
- if (!_instanceSubscriptionNames.contains(instanceName))
- {
- try
- {
- logger.info("Adding message listener for " + instanceName);
- manager.addMessageListener(this, instanceName);
- _instanceSubscriptionNames.add(instanceName);
- }
- catch (Exception e)
- {
- logger.error("Exception adding message listener for instance:" + instanceName,
- e);
- }
- }
-
- // TODO we need to remove currentState listeners and message listeners
- // when a session or an instance no longer exists. This may happen
- // in case of session expiry, participant rebound, participant goes and new
- // participant comes
-
- // TODO shi should call removeListener on the previous session id;
- // but the removeListener with that functionality is not implemented yet
+ // construct maps for current live-instances
+ Map<String, LiveInstance> curInstances = new HashMap<String, LiveInstance>();
+ Map<String, LiveInstance> curSessions = new HashMap<String, LiveInstance>();
+ for(LiveInstance liveInstance : liveInstances) {
+ curInstances.put(liveInstance.getInstanceName(), liveInstance);
+ curSessions.put(liveInstance.getSessionId(), liveInstance);
+ }
+
+ Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();
+ Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
+
+ HelixManager manager = changeContext.getManager();
+ Builder keyBuilder = new Builder(manager.getClusterName());
+ if (lastSessions != null) {
+ for (String session : lastSessions.keySet()) {
+ if (!curSessions.containsKey(session)) {
+ // remove current-state listener for expired session
+ String instanceName = lastSessions.get(session).getInstanceName();
+ manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
+ }
+ }
+ }
+
+ if (lastInstances != null) {
+ for (String instance : lastInstances.keySet()) {
+ if (!curInstances.containsKey(instance)) {
+ // remove message listener for disconnected instances
+ manager.removeListener(keyBuilder.messages(instance), this);
+ }
+ }
}
+
+ for (String session : curSessions.keySet()) {
+ if (lastSessions == null || !lastSessions.containsKey(session)) {
+ String instanceName = curSessions.get(session).getInstanceName();
+ try {
+ // add current-state listeners for new sessions
+ manager.addCurrentStateChangeListener(this, instanceName, session);
+ logger.info("Succeed in addling current state listener for instance: " + instanceName + " with session: " + session);
+
+ } catch (Exception e) {
+ logger.error("Fail to add current state listener for instance: "
+ + instanceName + " with session: " + session, e);
+ }
+ }
+ }
+
+ for (String instance : curInstances.keySet()) {
+ if (lastInstances == null || !lastInstances.containsKey(instance)) {
+ try {
+ // add message listeners for new sessions
+ manager.addMessageListener(this, instance);
+ logger.info("Succeed in adding message listener for " + instance);
+ }
+ catch (Exception e)
+ {
+ logger.error("Fail to add message listener for instance:" + instance, e);
+ }
+ }
+ }
+
+ // update last-seen
+ _lastSeenInstances.set(curInstances);
+ _lastSeenSessions.set(curSessions);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index 338e03c..b595879 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -151,7 +151,8 @@ public class HelixControllerMain
manager.addConfigChangeListener(controller);
manager.addLiveInstanceChangeListener(controller);
manager.addIdealStateChangeListener(controller);
- manager.addExternalViewChangeListener(controller);
+ // no need for controller to listen on external-view
+ // manager.addExternalViewChangeListener(controller);
manager.addControllerListener(controller);
} catch (ZkInterruptedException e)
{
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 4e08ab0..0e70a17 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -350,7 +350,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
}
catch (ZkNoNodeException e)
{
- logger.warn("fail to subscribe child/data change@" + path, e);
+ logger.warn("fail to subscribe child/data change@. path: " + path
+ + ", listener: " + _listener, e);
}
}
@@ -396,6 +397,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
}
catch (Exception e)
{
+ logger.error("exception in handling data-change. path: " + dataPath
+ + ", listener: " + _listener);
ZKExceptionHandler.getInstance().handle(e);
}
}
@@ -412,7 +415,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
+ dataPath + ", listener: " + _listener);
_zkClient.unsubscribeDataChanges(dataPath, this);
- // only needed for bucketized parent, but OK if we don't have child-change watch on the path
+ // only needed for bucketized parent, but OK if we don't have child-change
+ // watch on the bucketized parent path
logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: "
+ dataPath + ", listener: " + _listener);
_zkClient.unsubscribeChildChanges(dataPath, this);
@@ -444,6 +448,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
}
catch (Exception e)
{
+ logger.error("exception in handling child-change. parentPath: "
+ + parentPath + ", listener: " + _listener);
ZKExceptionHandler.getInstance().handle(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index f542993..18c3bd5 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -102,7 +102,7 @@ public class ZKHelixManager implements HelixManager
private ZKHelixDataAccessor _helixAccessor;
private ConfigAccessor _configAccessor;
protected ZkClient _zkClient;
- protected final List<CallbackHandler> _handlers;
+ protected List<CallbackHandler> _handlers;
private final ZkStateChangeListener _zkStateChangeListener;
private final InstanceType _instanceType;
volatile String _sessionId;
@@ -201,8 +201,6 @@ public class ZKHelixManager implements HelixManager
_zkStateChangeListener = new ZkStateChangeListener(this, _flappingTimeWindowMs, _maxDisconnectThreshold);
_timer = null;
- _handlers = new ArrayList<CallbackHandler>();
-
_messagingService = new DefaultMessagingService(this);
_version =
@@ -246,6 +244,35 @@ public class ZKHelixManager implements HelixManager
return true;
}
+ @Override
+ public boolean removeListener(PropertyKey key, Object listener)
+ {
+ logger.info("Removing listener: " + listener + " on path: " + key.getPath()
+ + " from cluster: " + _clusterName + " by instance: " + _instanceName);
+
+ synchronized (this)
+ {
+ List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
+ for (CallbackHandler handler : _handlers)
+ {
+ // compare property-key path and listener reference
+ if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener))
+ {
+ toRemove.add(handler);
+ }
+ }
+
+ _handlers.removeAll(toRemove);
+
+ // handler.reset() may modify the handlers list, so do it outside the iteration
+ for (CallbackHandler handler : toRemove) {
+ handler.reset();
+ }
+ }
+
+ return true;
+ }
+
private void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType, EventType[] eventType)
{
checkConnected();
@@ -494,30 +521,6 @@ public class ZKHelixManager implements HelixManager
return -1;
}
- @Override
- public boolean removeListener(Object listener)
- {
- logger.info("remove listener: " + listener + " from cluster: " + _clusterName
- + ", instance: " + _instanceName);
-
- synchronized (this)
- {
- Iterator<CallbackHandler> iterator = _handlers.iterator();
- while (iterator.hasNext())
- {
- CallbackHandler handler = iterator.next();
- // simply compare reference
- if (handler.getListener().equals(listener))
- {
- handler.reset();
- iterator.remove();
- }
- }
- }
-
- return true;
- }
-
private void addLiveInstance()
{
LiveInstance liveInstance = new LiveInstance(_instanceName);
@@ -686,7 +689,10 @@ public class ZKHelixManager implements HelixManager
}
_baseDataAccessor.reset();
+ // reset all handlers so they have a chance to unsubscribe zk changes from zkclient
+ // abandon all callback-handlers added in expired session
resetHandlers();
+ _handlers = new ArrayList<CallbackHandler>();
logger.info("Handling new session, session id:" + _sessionId + ", instance:"
+ _instanceName + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
@@ -730,22 +736,13 @@ public class ZKHelixManager implements HelixManager
.registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(),
defaultParticipantErrorMessageHandlerFactory);
- if (_leaderElectionHandler == null)
- {
- final String path =
- PropertyPathConfig.getPath(PropertyType.CONTROLLER, _clusterName);
-
- _leaderElectionHandler =
+ // create a new leader-election handler for a new session
+ _leaderElectionHandler =
createCallBackHandler(new Builder(_clusterName).controller(),
new DistClusterControllerElection(_zkConnectString),
new EventType[] { EventType.NodeChildrenChanged,
EventType.NodeDeleted, EventType.NodeCreated },
ChangeType.CONTROLLER);
- }
- else
- {
- _leaderElectionHandler.init();
- }
}
if (_instanceType == InstanceType.PARTICIPANT
@@ -830,42 +827,38 @@ public class ZKHelixManager implements HelixManager
{
synchronized (this)
{
- // get a copy of the list and iterate over the copy list
- // in case handler.reset() will modify the original handler list
- List<CallbackHandler> handlers = new ArrayList<CallbackHandler>();
- handlers.addAll(_handlers);
-
- for (CallbackHandler handler : handlers)
- {
- handler.reset();
- logger.info("reset handler: " + handler.getPath() + " by "
- + Thread.currentThread().getName());
- }
+ if (_handlers != null)
+ {
+ // get a copy of the list and iterate over the copy list
+ // in case handler.reset() will modify the original handler list
+ List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
+ tmpHandlers.addAll(_handlers);
+
+ for (CallbackHandler handler : tmpHandlers)
+ {
+ handler.reset();
+ logger.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
+ }
+ }
}
}
private void initHandlers()
{
- // may add new currentState and message listeners during init()
- // so make a copy and iterate over the copy
synchronized (this)
{
- List<CallbackHandler> handlers = new ArrayList<CallbackHandler>();
- handlers.addAll(_handlers);
- for (CallbackHandler handler : handlers)
- {
- handler.init();
- }
- }
- }
-
- private void addListener(CallbackHandler handler)
- {
- synchronized (this)
- {
- _handlers.add(handler);
- logger.info("add handler: " + handler.getPath() + " by "
- + Thread.currentThread().getName());
+ if (_handlers != null)
+ {
+ // may add new currentState and message listeners during init()
+ // so make a copy and iterate over the copy
+ List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
+ tmpHandlers.addAll(_handlers);
+ for (CallbackHandler handler : tmpHandlers)
+ {
+ handler.init();
+ logger.info("init handler: " + handler.getPath() + ", " + handler.getListener());
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
index a638bc7..f45ba6c 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
@@ -89,7 +90,31 @@ public class GenericLeaderStandbyModel extends StateModel
{
LOG.info("Become STANDBY from LEADER");
HelixManager manager = context.getManager();
- manager.removeListener(_particHolder);
+ if (manager == null)
+ {
+ throw new IllegalArgumentException("Require HelixManager in notification conext");
+ }
+
+ Builder keyBuilder = new Builder(manager.getClusterName());
+ for (ChangeType notificationType : _notificationTypes)
+ {
+ if (notificationType == ChangeType.LIVE_INSTANCE)
+ {
+ manager.removeListener(keyBuilder.liveInstances(), _particHolder);
+ }
+ else if (notificationType == ChangeType.CONFIG)
+ {
+ manager.removeListener(keyBuilder.instanceConfigs(), _particHolder);
+ }
+ else if (notificationType == ChangeType.EXTERNAL_VIEW)
+ {
+ manager.removeListener(keyBuilder.externalViews(), _particHolder);
+ }
+ else
+ {
+ LOG.error("Unsupport notificationType:" + notificationType.toString());
+ }
+ }
}
@Transition(to="OFFLINE",from="STANDBY")
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 2603272..66e7fa3 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -384,7 +384,7 @@ public class Mocks {
}
@Override
- public boolean removeListener(Object listener) {
+ public boolean removeListener(PropertyKey key, Object listener) {
// TODO Auto-generated method stub
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index 1bced7f..c74d650 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -23,6 +23,7 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -34,6 +35,7 @@ import org.I0Itec.zkclient.ZkConnection;
import org.apache.helix.InstanceType;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -352,5 +354,4 @@ public class ZkTestHelper
return listenerMapBySession;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index 25c139b..cad41d0 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -36,6 +36,7 @@ import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.MessageListener;
import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyKey;
import org.apache.helix.ScopedConfigChangeListener;
import org.apache.helix.ZNRecord;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
@@ -121,7 +122,7 @@ public class DummyClusterManager implements HelixManager
}
@Override
- public boolean removeListener(Object listener)
+ public boolean removeListener(PropertyKey key, Object listener)
{
// TODO Auto-generated method stub
return false;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
new file mode 100644
index 0000000..2ff2b1e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -0,0 +1,197 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkHelixTestManager;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
+
+ @Test
+ public void testCbHandlerLeakOnParticipantSessionExpiry() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ MockParticipant[] participants = new MockParticipant[n];
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ ZkHelixTestManager controllerManager = controller.getManager();
+ ZkHelixTestManager participantManagerToExpire = (ZkHelixTestManager)participants[1].getManager();
+
+ // printHandlers(controllerManager);
+ // printHandlers(participantManagerToExpire);
+ int controllerHandlerNb = controllerManager.getHandlers().size();
+ int particHandlerNb = participantManagerToExpire.getHandlers().size();
+ Assert.assertEquals(controllerHandlerNb, 9, "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
+ Assert.assertEquals(particHandlerNb, 2, "HelixParticipant should have 2 (msg+cur-state) callback handlers");
+
+ // expire the session of participant
+ System.out.println("Expiring participant session...");
+ String oldSessionId = participantManagerToExpire.getSessionId();
+
+ ZkTestHelper.expireSession(participantManagerToExpire.getZkClient());
+ String newSessionId = participantManagerToExpire.getSessionId();
+ System.out.println("Expried participant session. oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
+
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ // printHandlers(controllerManager);
+ // printHandlers(participantManagerToExpire);
+ int handlerNb = controllerManager.getHandlers().size();
+ Assert.assertEquals(handlerNb, controllerHandlerNb, "controller callback handlers should not increase after participant session expiry");
+ handlerNb = participantManagerToExpire.getHandlers().size();
+ Assert.assertEquals(handlerNb, particHandlerNb, "participant callback handlers should not increase after participant session expiry");
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testCbHandlerLeakOnControllerSessionExpiry() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ MockParticipant[] participants = new MockParticipant[n];
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ ZkHelixTestManager controllerManager = controller.getManager();
+ ZkHelixTestManager participantManager = participants[0].getManager();
+
+ // printHandlers(controllerManager);
+ int controllerHandlerNb = controllerManager.getHandlers().size();
+ int particHandlerNb = participantManager.getHandlers().size();
+ Assert.assertEquals(controllerHandlerNb, 9, "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
+ Assert.assertEquals(particHandlerNb, 2, "HelixParticipant should have 2 (msg+cur-state) callback handlers");
+
+
+ // expire controller
+ System.out.println("Expiring controller session...");
+ String oldSessionId = controllerManager.getSessionId();
+
+ ZkTestHelper.expireSession(controllerManager.getZkClient());
+ String newSessionId = controllerManager.getSessionId();
+ System.out.println("Expired controller session. oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
+
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ // printHandlers(controllerManager);
+ int handlerNb = controllerManager.getHandlers().size();
+ Assert.assertEquals(handlerNb, controllerHandlerNb, "controller callback handlers should not increase after participant session expiry");
+ handlerNb = participantManager.getHandlers().size();
+ Assert.assertEquals(handlerNb, particHandlerNb, "participant callback handlers should not increase after participant session expiry");
+
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ static void printHandlers(ZkHelixTestManager manager)
+ {
+ List<CallbackHandler> handlers = manager.getHandlers();
+ System.out.println("\n" + manager.getInstanceName() + " cb-handler#: " + handlers.size());
+
+ for (int i = 0; i < handlers.size(); i++) {
+ CallbackHandler handler = handlers.get(i);
+ String path = handler.getPath();
+ System.out.println(path.substring(manager.getClusterName().length() + 1) + ": " + handler.getListener());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
index a2f0fcd..732e68a 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
@@ -64,15 +64,18 @@ public class TestLiveInstanceBounce extends ZkStandAloneCMTestBaseWithPropertySe
boolean result = ClusterStateVerifier.verifyByPolling(
new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME), 50 * 1000);
Assert.assertTrue(result);
- // When a new live instance is created, we still add current state listener to it thus number should increase by 2
- for(int j = 0; j < 10; j++)
+
+ // When a new live instance is created, we add current state listener to it
+ // and we will remove current-state listener on expired session
+ // so the number of callback handlers is unchanged
+ for (int j = 0; j < 10; j++)
{
- if(controller.getHandlers().size() == (handlerSize + 2))
+ if(controller.getHandlers().size() == (handlerSize))
{
break;
}
Thread.sleep(400);
}
- Assert.assertEquals( controller.getHandlers().size(), handlerSize + 2);
+ Assert.assertEquals( controller.getHandlers().size(), handlerSize);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index 093499d..c65ce24 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -29,6 +29,7 @@ import org.apache.helix.ConfigScopeBuilder;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixException;
import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
@@ -88,9 +89,10 @@ public class TestZkClusterManager extends ZkUnitTestBase
// OK
}
+ Builder keyBuilder = new Builder(controller.getClusterName());
controller.addControllerListener(listener);
AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
- controller.removeListener(listener);
+ controller.removeListener(keyBuilder.controller(), listener);
ZkHelixPropertyStore<ZNRecord> store = controller.getHelixPropertyStore();
ZNRecord record = new ZNRecord("node_1");
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java b/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
index 511623f..199c543 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.ZkHelixTestManager;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.participant.DistClusterControllerStateModelFactory;
import org.apache.helix.participant.StateMachineEngine;
@@ -41,7 +42,7 @@ public class ClusterController extends Thread
private final String _controllerMode;
private final String _zkAddr;
- private HelixManager _manager;
+ private ZkHelixTestManager _manager;
public ClusterController(String clusterName, String controllerName, String zkAddr) throws Exception
{
@@ -58,20 +59,11 @@ public class ClusterController extends Thread
if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
{
- _manager =
- HelixManagerFactory.getZKHelixManager(clusterName,
- controllerName,
- InstanceType.CONTROLLER,
- zkAddr);
+ _manager = new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
}
else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
{
- _manager =
- HelixManagerFactory.getZKHelixManager(clusterName,
- controllerName,
- InstanceType.CONTROLLER_PARTICIPANT,
- zkAddr);
-
+ _manager = new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkAddr);
}
else
{
@@ -80,7 +72,7 @@ public class ClusterController extends Thread
}
}
- public HelixManager getManager()
+ public ZkHelixTestManager getManager()
{
return _manager;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
index b3fdeb2..8b389fe 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
@@ -505,7 +505,7 @@ public class MockParticipant extends Thread
}
}
- public HelixManager getManager()
+ public ZkHelixTestManager getManager()
{
return _manager;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 517dbdf..529b534 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -38,6 +38,7 @@ import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.MessageListener;
import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyKey;
import org.apache.helix.ScopedConfigChangeListener;
import org.apache.helix.ZNRecord;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
@@ -132,7 +133,7 @@ public class MockZKHelixManager implements HelixManager
}
@Override
- public boolean removeListener(Object listener)
+ public boolean removeListener(PropertyKey key, Object listener)
{
// TODO Auto-generated method stub
return false;