You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/11/22 05:04:43 UTC

[1/2] helix git commit: [HELIX-550] Shutdown GenericHelixController on disconnect

Repository: helix
Updated Branches:
  refs/heads/master 8e58aa5ad -> a0ab2b2e3


[HELIX-550] Shutdown GenericHelixController on disconnect


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bfb4a3d3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bfb4a3d3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bfb4a3d3

Branch: refs/heads/master
Commit: bfb4a3d34228f5c3806b1eee9e98f401386e66a9
Parents: 122ebf5
Author: Kanak Biscuitwala <ka...@hotmail.com>
Authored: Tue Nov 18 21:23:29 2014 -0800
Committer: Kanak Biscuitwala <ka...@hotmail.com>
Committed: Tue Nov 18 21:23:29 2014 -0800

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      | 12 ++-
 .../helix/manager/zk/ZkHelixController.java     | 77 ++++++++++++--------
 2 files changed, 55 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/bfb4a3d3/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index aef636e..113cace 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -92,9 +92,9 @@ import com.google.common.collect.Lists;
  */
 public class GenericHelixController implements IdealStateChangeListener,
     LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
-    ControllerChangeListener, InstanceConfigChangeListener,
-    ScopedConfigChangeListener {
+    ControllerChangeListener, InstanceConfigChangeListener, ScopedConfigChangeListener {
   private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
+  private static final long EVENT_THREAD_JOIN_TIMEOUT = 1000L;
   volatile boolean init = false;
   private final PipelineRegistry _registry;
 
@@ -621,6 +621,14 @@ public class GenericHelixController implements IdealStateChangeListener,
     }
   }
 
+  public void shutdown() throws InterruptedException {
+    stopRebalancingTimer();
+    while (_eventThread.isAlive()) {
+      _eventThread.interrupt();
+      _eventThread.join(EVENT_THREAD_JOIN_TIMEOUT);
+    }
+  }
+
   private class ClusterEventProcessor extends Thread {
     @Override
     public void run() {

http://git-wip-us.apache.org/repos/asf/helix/blob/bfb4a3d3/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
index 295b69c..fafe604 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -47,24 +47,25 @@ import org.apache.log4j.Logger;
 public class ZkHelixController implements HelixController {
   private static Logger LOG = Logger.getLogger(ZkHelixController.class);
 
-  final ZkHelixConnection _connection;
-  final ClusterId _clusterId;
-  final ControllerId _controllerId;
-  final GenericHelixController _pipeline;
-  final DefaultMessagingService _messagingService;
-  final List<HelixTimerTask> _timerTasks;
-  final ClusterAccessor _clusterAccessor;
-  final HelixDataAccessor _accessor;
-  final HelixManager _manager;
-  final ZkHelixLeaderElection _leaderElection;
-  boolean _isStarted;
+  private final ZkHelixConnection _connection;
+  private final ClusterId _clusterId;
+  private final ControllerId _controllerId;
+  private final DefaultMessagingService _messagingService;
+  private final List<HelixTimerTask> _timerTasks;
+  @SuppressWarnings("unused")
+  private final ClusterAccessor _clusterAccessor;
+  private final HelixDataAccessor _accessor;
+  private final HelixManager _manager;
+  private boolean _isStarted;
+
+  private GenericHelixController _pipeline;
+  private ZkHelixLeaderElection _leaderElection;
 
   public ZkHelixController(ZkHelixConnection connection, ClusterId clusterId,
       ControllerId controllerId) {
     _connection = connection;
     _clusterId = clusterId;
     _controllerId = controllerId;
-    _pipeline = new GenericHelixController();
     _clusterAccessor = connection.createClusterAccessor(clusterId);
     _accessor = connection.createDataAccessor(clusterId);
 
@@ -72,7 +73,6 @@ public class ZkHelixController implements HelixController {
     _timerTasks = new ArrayList<HelixTimerTask>();
 
     _manager = new ZKHelixManager(this);
-    _leaderElection = new ZkHelixLeaderElection(this, _pipeline);
 
     _timerTasks.add(new StatusDumpTask(clusterId, _manager.getHelixDataAccessor()));
   }
@@ -112,34 +112,47 @@ public class ZkHelixController implements HelixController {
   }
 
   void reset() {
-    /**
-     * reset all handlers, make sure cleanup completed for previous session
-     * disconnect if fail to cleanup
-     */
-    _connection.resetHandlers(this);
+    // clean up old pipeline instance
+    if (_leaderElection != null) {
+      _connection.removeListener(this, _leaderElection, _accessor.keyBuilder().controller());
+    }
+    if (_pipeline != null) {
+      try {
+        _pipeline.shutdown();
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted shutting down GenericHelixController", e);
+      } finally {
+        _pipeline = null;
+        _leaderElection = null;
+      }
+    }
 
+    // reset all handlers, make sure cleanup completed for previous session
+    // disconnect if fail to cleanup
+    _connection.resetHandlers(this);
   }
 
   void init() {
-    /**
-     * from here on, we are dealing with new session
-     * init handlers
-     */
+    // from here on, we are dealing with new session
+
+    // init handlers
     if (!ZKUtil.isClusterSetup(_clusterId.toString(), _connection._zkclient)) {
       throw new HelixException("Cluster structure is not set up for cluster: " + _clusterId);
     }
 
-    /**
-     * leader-election listener should be reset/init before all other controller listeners;
-     * it's ok to add a listener multiple times, since we check existence in
-     * ZkHelixConnection#addXXXListner()
-     */
-    _connection.addControllerListener(this, _leaderElection, _clusterId);
+    // Recreate the pipeline on a new connection
+    if (_pipeline == null) {
+      _pipeline = new GenericHelixController();
+      _leaderElection = new ZkHelixLeaderElection(this, _pipeline);
 
-    /**
-     * ok to init message handler and controller handlers twice
-     * the second init will be skipped (see CallbackHandler)
-     */
+      // leader-election listener should be reset/init before all other controller listeners;
+      // it's ok to add a listener multiple times, since we check existence in
+      // ZkHelixConnection#addXXXListner()
+      _connection.addControllerListener(this, _leaderElection, _clusterId);
+    }
+
+    // ok to init message handler and controller handlers twice
+    // the second init will be skipped (see CallbackHandler)
     _connection.initHandlers(this);
   }
 


[2/2] helix git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/helix

Posted by ka...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/helix


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a0ab2b2e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a0ab2b2e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a0ab2b2e

Branch: refs/heads/master
Commit: a0ab2b2e34311aab9f2c211970758ffaea8b2397
Parents: bfb4a3d 8e58aa5
Author: Kanak Biscuitwala <ka...@hotmail.com>
Authored: Fri Nov 21 20:01:49 2014 -0800
Committer: Kanak Biscuitwala <ka...@hotmail.com>
Committed: Fri Nov 21 20:01:49 2014 -0800

----------------------------------------------------------------------
 .../helix/agent/AgentStateModelFactory.java     |   3 +-
 .../main/java/org/apache/helix/PropertyKey.java |   9 ++
 .../api/StateTransitionHandlerFactory.java      |  63 ++++++----
 .../handling/HelixStateTransitionHandler.java   |   2 +-
 .../GenericLeaderStandbyStateModelFactory.java  |   4 +-
 .../participant/HelixStateMachineEngine.java    |  51 ++++----
 ...usterControllerTransitionHandlerFactory.java |   4 +-
 .../statemachine/ScheduledTaskStateModel.java   |   9 +-
 .../ScheduledTaskStateModelFactory.java         |   7 +-
 .../helix/task/TaskStateModelFactory.java       |   3 +-
 .../org/apache/helix/TestHelixTaskExecutor.java |   2 +-
 .../org/apache/helix/TestHelixTaskHandler.java  |   2 +-
 .../TestCorrectnessOnConnectivityLoss.java      |   3 +-
 .../integration/TestEntropyFreeNodeBounce.java  |  16 +++
 .../helix/integration/TestHelixConnection.java  |   2 +-
 .../helix/integration/TestMessageThrottle2.java |   3 +-
 .../TestPartitionLevelTransitionConstraint.java |   3 +-
 .../integration/TestPreferenceListAsQueue.java  |   8 +-
 .../TestResourceWithSamePartitionKey.java       | 125 +++++++++++++++++++
 .../integration/TestStateTransitionTimeout.java |  12 +-
 .../helix/integration/TestZkReconnect.java      |   2 +-
 .../integration/manager/TestStateModelLeak.java |  21 ++--
 .../helix/mock/participant/DummyProcess.java    |   7 +-
 .../participant/MockBootstrapModelFactory.java  |   8 +-
 .../mock/participant/MockMSModelFactory.java    |  12 +-
 .../participant/MockSchemataModelFactory.java   |   4 +-
 ...MultiClusterControllerStateModelFactory.java |   3 +-
 .../apache/helix/examples/BootstrapHandler.java |   3 +-
 .../apache/helix/examples/DummyParticipant.java |   3 +-
 .../LeaderStandbyStateModelFactory.java         |   3 +-
 .../helix/examples/LogicalModelExample.java     |   2 +-
 .../examples/MasterSlaveStateModelFactory.java  |   3 +-
 .../OnlineOfflineStateModelFactory.java         |   3 +-
 .../helix/ipc/TestNettyHelixIPCService.java     |   3 +-
 .../StatelessServiceStateModelFactory.java      |   3 +-
 .../apache/helix/lockmanager/LockFactory.java   |   3 +-
 .../ConsumerTransitionHandlerFactory.java       |   3 +-
 .../filestore/FileStoreStateModelFactory.java   |   3 +-
 .../taskexecution/TaskStateModelFactory.java    |   3 +-
 .../userdefinedrebalancer/LockFactory.java      |   3 +-
 40 files changed, 324 insertions(+), 102 deletions(-)
----------------------------------------------------------------------