You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/11/22 05:04:43 UTC
[1/2] helix git commit: [HELIX-550] Shutdown GenericHelixController
on disconnect
Repository: helix
Updated Branches:
refs/heads/master 8e58aa5ad -> a0ab2b2e3
[HELIX-550] Shutdown GenericHelixController on disconnect
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bfb4a3d3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bfb4a3d3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bfb4a3d3
Branch: refs/heads/master
Commit: bfb4a3d34228f5c3806b1eee9e98f401386e66a9
Parents: 122ebf5
Author: Kanak Biscuitwala <ka...@hotmail.com>
Authored: Tue Nov 18 21:23:29 2014 -0800
Committer: Kanak Biscuitwala <ka...@hotmail.com>
Committed: Tue Nov 18 21:23:29 2014 -0800
----------------------------------------------------------------------
.../controller/GenericHelixController.java | 12 ++-
.../helix/manager/zk/ZkHelixController.java | 77 ++++++++++++--------
2 files changed, 55 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/bfb4a3d3/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index aef636e..113cace 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -92,9 +92,9 @@ import com.google.common.collect.Lists;
*/
public class GenericHelixController implements IdealStateChangeListener,
LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
- ControllerChangeListener, InstanceConfigChangeListener,
- ScopedConfigChangeListener {
+ ControllerChangeListener, InstanceConfigChangeListener, ScopedConfigChangeListener {
private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
+ private static final long EVENT_THREAD_JOIN_TIMEOUT = 1000L;
volatile boolean init = false;
private final PipelineRegistry _registry;
@@ -621,6 +621,14 @@ public class GenericHelixController implements IdealStateChangeListener,
}
}
+ public void shutdown() throws InterruptedException {
+ stopRebalancingTimer();
+ while (_eventThread.isAlive()) {
+ _eventThread.interrupt();
+ _eventThread.join(EVENT_THREAD_JOIN_TIMEOUT);
+ }
+ }
+
private class ClusterEventProcessor extends Thread {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/helix/blob/bfb4a3d3/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
index 295b69c..fafe604 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -47,24 +47,25 @@ import org.apache.log4j.Logger;
public class ZkHelixController implements HelixController {
private static Logger LOG = Logger.getLogger(ZkHelixController.class);
- final ZkHelixConnection _connection;
- final ClusterId _clusterId;
- final ControllerId _controllerId;
- final GenericHelixController _pipeline;
- final DefaultMessagingService _messagingService;
- final List<HelixTimerTask> _timerTasks;
- final ClusterAccessor _clusterAccessor;
- final HelixDataAccessor _accessor;
- final HelixManager _manager;
- final ZkHelixLeaderElection _leaderElection;
- boolean _isStarted;
+ private final ZkHelixConnection _connection;
+ private final ClusterId _clusterId;
+ private final ControllerId _controllerId;
+ private final DefaultMessagingService _messagingService;
+ private final List<HelixTimerTask> _timerTasks;
+ @SuppressWarnings("unused")
+ private final ClusterAccessor _clusterAccessor;
+ private final HelixDataAccessor _accessor;
+ private final HelixManager _manager;
+ private boolean _isStarted;
+
+ private GenericHelixController _pipeline;
+ private ZkHelixLeaderElection _leaderElection;
public ZkHelixController(ZkHelixConnection connection, ClusterId clusterId,
ControllerId controllerId) {
_connection = connection;
_clusterId = clusterId;
_controllerId = controllerId;
- _pipeline = new GenericHelixController();
_clusterAccessor = connection.createClusterAccessor(clusterId);
_accessor = connection.createDataAccessor(clusterId);
@@ -72,7 +73,6 @@ public class ZkHelixController implements HelixController {
_timerTasks = new ArrayList<HelixTimerTask>();
_manager = new ZKHelixManager(this);
- _leaderElection = new ZkHelixLeaderElection(this, _pipeline);
_timerTasks.add(new StatusDumpTask(clusterId, _manager.getHelixDataAccessor()));
}
@@ -112,34 +112,47 @@ public class ZkHelixController implements HelixController {
}
void reset() {
- /**
- * reset all handlers, make sure cleanup completed for previous session
- * disconnect if fail to cleanup
- */
- _connection.resetHandlers(this);
+ // clean up old pipeline instance
+ if (_leaderElection != null) {
+ _connection.removeListener(this, _leaderElection, _accessor.keyBuilder().controller());
+ }
+ if (_pipeline != null) {
+ try {
+ _pipeline.shutdown();
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted shutting down GenericHelixController", e);
+ } finally {
+ _pipeline = null;
+ _leaderElection = null;
+ }
+ }
+ // reset all handlers, make sure cleanup completed for previous session
+ // disconnect if fail to cleanup
+ _connection.resetHandlers(this);
}
void init() {
- /**
- * from here on, we are dealing with new session
- * init handlers
- */
+ // from here on, we are dealing with new session
+
+ // init handlers
if (!ZKUtil.isClusterSetup(_clusterId.toString(), _connection._zkclient)) {
throw new HelixException("Cluster structure is not set up for cluster: " + _clusterId);
}
- /**
- * leader-election listener should be reset/init before all other controller listeners;
- * it's ok to add a listener multiple times, since we check existence in
- * ZkHelixConnection#addXXXListner()
- */
- _connection.addControllerListener(this, _leaderElection, _clusterId);
+ // Recreate the pipeline on a new connection
+ if (_pipeline == null) {
+ _pipeline = new GenericHelixController();
+ _leaderElection = new ZkHelixLeaderElection(this, _pipeline);
- /**
- * ok to init message handler and controller handlers twice
- * the second init will be skipped (see CallbackHandler)
- */
+ // leader-election listener should be reset/init before all other controller listeners;
+ // it's ok to add a listener multiple times, since we check existence in
+ // ZkHelixConnection#addXXXListner()
+ _connection.addControllerListener(this, _leaderElection, _clusterId);
+ }
+
+ // ok to init message handler and controller handlers twice
+ // the second init will be skipped (see CallbackHandler)
_connection.initHandlers(this);
}
[2/2] helix git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/helix
Posted by ka...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/helix
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a0ab2b2e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a0ab2b2e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a0ab2b2e
Branch: refs/heads/master
Commit: a0ab2b2e34311aab9f2c211970758ffaea8b2397
Parents: bfb4a3d 8e58aa5
Author: Kanak Biscuitwala <ka...@hotmail.com>
Authored: Fri Nov 21 20:01:49 2014 -0800
Committer: Kanak Biscuitwala <ka...@hotmail.com>
Committed: Fri Nov 21 20:01:49 2014 -0800
----------------------------------------------------------------------
.../helix/agent/AgentStateModelFactory.java | 3 +-
.../main/java/org/apache/helix/PropertyKey.java | 9 ++
.../api/StateTransitionHandlerFactory.java | 63 ++++++----
.../handling/HelixStateTransitionHandler.java | 2 +-
.../GenericLeaderStandbyStateModelFactory.java | 4 +-
.../participant/HelixStateMachineEngine.java | 51 ++++----
...usterControllerTransitionHandlerFactory.java | 4 +-
.../statemachine/ScheduledTaskStateModel.java | 9 +-
.../ScheduledTaskStateModelFactory.java | 7 +-
.../helix/task/TaskStateModelFactory.java | 3 +-
.../org/apache/helix/TestHelixTaskExecutor.java | 2 +-
.../org/apache/helix/TestHelixTaskHandler.java | 2 +-
.../TestCorrectnessOnConnectivityLoss.java | 3 +-
.../integration/TestEntropyFreeNodeBounce.java | 16 +++
.../helix/integration/TestHelixConnection.java | 2 +-
.../helix/integration/TestMessageThrottle2.java | 3 +-
.../TestPartitionLevelTransitionConstraint.java | 3 +-
.../integration/TestPreferenceListAsQueue.java | 8 +-
.../TestResourceWithSamePartitionKey.java | 125 +++++++++++++++++++
.../integration/TestStateTransitionTimeout.java | 12 +-
.../helix/integration/TestZkReconnect.java | 2 +-
.../integration/manager/TestStateModelLeak.java | 21 ++--
.../helix/mock/participant/DummyProcess.java | 7 +-
.../participant/MockBootstrapModelFactory.java | 8 +-
.../mock/participant/MockMSModelFactory.java | 12 +-
.../participant/MockSchemataModelFactory.java | 4 +-
...MultiClusterControllerStateModelFactory.java | 3 +-
.../apache/helix/examples/BootstrapHandler.java | 3 +-
.../apache/helix/examples/DummyParticipant.java | 3 +-
.../LeaderStandbyStateModelFactory.java | 3 +-
.../helix/examples/LogicalModelExample.java | 2 +-
.../examples/MasterSlaveStateModelFactory.java | 3 +-
.../OnlineOfflineStateModelFactory.java | 3 +-
.../helix/ipc/TestNettyHelixIPCService.java | 3 +-
.../StatelessServiceStateModelFactory.java | 3 +-
.../apache/helix/lockmanager/LockFactory.java | 3 +-
.../ConsumerTransitionHandlerFactory.java | 3 +-
.../filestore/FileStoreStateModelFactory.java | 3 +-
.../taskexecution/TaskStateModelFactory.java | 3 +-
.../userdefinedrebalancer/LockFactory.java | 3 +-
40 files changed, 324 insertions(+), 102 deletions(-)
----------------------------------------------------------------------