You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/16 05:59:32 UTC

git commit: Create an abstract participant service

Updated Branches:
  refs/heads/helix-provisioning 852be0ccb -> 8b1763585


Create an abstract participant service


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8b176358
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8b176358
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8b176358

Branch: refs/heads/helix-provisioning
Commit: 8b176358583ca39fd2bba5725e423d9b9baba662
Parents: 852be0c
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Jan 15 20:59:20 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jan 15 20:59:20 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixService.java     |   8 +-
 .../stages/ContainerProvisioningStage.java      |   2 +
 .../manager/zk/AbstractParticipantService.java  | 106 +++++++++++++++++++
 .../helix/manager/zk/ZkHelixAutoController.java |   4 +-
 .../helix/manager/zk/ZkHelixController.java     |   4 +-
 .../helix/manager/zk/ZkHelixParticipant.java    |   4 +-
 .../helix/integration/TestHelixConnection.java  |   8 +-
 .../integration/TestLocalContainerProvider.java |  10 +-
 .../manager/zk/TestZkHelixAutoController.java   |   4 +-
 .../helix/manager/zk/TestZkHelixController.java |   6 +-
 .../manager/zk/TestZkHelixParticipant.java      |   4 +-
 .../helix/examples/LogicalModelExample.java     |  12 +--
 .../provisioning/yarn/ContainerParticipant.java |  26 ++---
 .../yarn/HelixYarnApplicationMasterMain.java    |   2 +-
 14 files changed, 148 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/HelixService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixService.java b/helix-core/src/main/java/org/apache/helix/HelixService.java
index a1ce0ec..40e9bae 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixService.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixService.java
@@ -24,12 +24,12 @@ package org.apache.helix;
  */
 public interface HelixService {
   /**
-   * start helix service async
+   * start helix service
    */
-  void startAsync();
+  void start();
 
   /**
-   * stop helix service async
+   * stop helix service
    */
-  void stopAsync();
+  void stop();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index a17251d..fdd6d4e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -96,6 +96,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
         final Cluster cluster = event.getAttribute("ClusterDataCache");
         final Collection<Participant> participants = cluster.getParticipantMap().values();
 
+        // TODO: if a process died, we need to mark it as stopped or something
+
         // Participants registered in helix
         // Give those participants to targetprovider
         // Provide the response that contains, new containerspecs, containers to be released,

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
new file mode 100644
index 0000000..2e5eafa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
@@ -0,0 +1,106 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+
+import com.google.common.util.concurrent.AbstractService;
+
+/**
+ * A modeling of a helix participant as a self-contained service.
+ */
+public abstract class AbstractParticipantService extends AbstractService {
+  private final ClusterId _clusterId;
+  private final ParticipantId _participantId;
+  private HelixParticipant _participant;
+  private HelixConnection _connection;
+
+  /**
+   * Initialize the service.
+   * @param connection A live Helix connection
+   * @param clusterId the cluster to join
+   * @param participantId a unique identifier that this participant will join with
+   */
+  public AbstractParticipantService(HelixConnection connection, ClusterId clusterId,
+      ParticipantId participantId) {
+    _connection = connection;
+    _clusterId = clusterId;
+    _participantId = participantId;
+  }
+
+  @Override
+  protected void doStart() {
+    _participant = _connection.createParticipant(_clusterId, _participantId);
+
+    // add a preconnect callback
+    _participant.addPreConnectCallback(new PreConnectCallback() {
+      @Override
+      public void onPreConnect() {
+        onPreJoinCluster();
+      }
+    });
+
+    // register state machine and other initialization
+    init();
+
+    // start and notify
+    if (!_connection.isConnected()) {
+      _connection.connect();
+    }
+    _participant.start();
+    notifyStarted();
+  }
+
+  @Override
+  protected void doStop() {
+    _participant.stop();
+    notifyStopped();
+  }
+
+  /**
+   * Initialize the participant. For example, here is where you can register a state machine: <br/>
+   * <br/>
+   * <code>
+   * HelixParticipant participant = getParticipant();
+   * participant.getStateMachineEngine().registerStateModelFactory(stateModelDefId, factory);
+   * </code><br/>
+   * <br/>
+   * This code is called prior to starting the participant.
+   */
+  public abstract void init();
+
+  /**
+   * Complete any tasks that require a live Helix connection. This function is called before the
+   * participant declares itself ready to receive state transitions.
+   */
+  public abstract void onPreJoinCluster();
+
+  /**
+   * Get an instantiated participant instance.
+   * @return HelixParticipant
+   */
+  public HelixParticipant getParticipant() {
+    return _participant;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
index 136d47e..1d4b225 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
@@ -78,13 +78,13 @@ public class ZkHelixAutoController implements HelixAutoController {
   }
 
   @Override
-  public void startAsync() {
+  public void start() {
     _connection.addConnectionStateListener(this);
     onConnected();
   }
 
   @Override
-  public void stopAsync() {
+  public void stop() {
     _connection.removeConnectionStateListener(this);
     onDisconnecting();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
index 3091a90..51bb746 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -96,13 +96,13 @@ public class ZkHelixController implements HelixController {
   }
 
   @Override
-  public void startAsync() {
+  public void start() {
     _connection.addConnectionStateListener(this);
     onConnected();
   }
 
   @Override
-  public void stopAsync() {
+  public void stop() {
     _connection.removeConnectionStateListener(this);
     onDisconnecting();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
index f8f7a46..c8748d1 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
@@ -440,13 +440,13 @@ public class ZkHelixParticipant implements HelixParticipant {
   }
 
   @Override
-  public void startAsync() {
+  public void start() {
     _connection.addConnectionStateListener(this);
     onConnected();
   }
 
   @Override
-  public void stopAsync() {
+  public void stop() {
     _connection.removeConnectionStateListener(this);
     onDisconnecting();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index b415393..9b772f2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -134,14 +134,14 @@ public class TestHelixConnection extends ZkUnitTestBase {
 
     // start controller
     HelixController controller = connection.createController(clusterId, controllerId);
-    controller.startAsync();
+    controller.start();
 
     // start participant
     HelixParticipant participant = connection.createParticipant(clusterId, participantId);
     participant.getStateMachineEngine().registerStateModelFactory(
         StateModelDefId.from("MasterSlave"), new MockStateModelFactory());
 
-    participant.startAsync();
+    participant.start();
 
     // verify
     final HelixDataAccessor accessor = connection.createDataAccessor(clusterId);
@@ -164,8 +164,8 @@ public class TestHelixConnection extends ZkUnitTestBase {
     Assert.assertTrue(success);
 
     // clean up
-    controller.stopAsync();
-    participant.stopAsync();
+    controller.stop();
+    participant.stop();
     connection.disconnect();
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
index 2f1d397..7df03f3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
@@ -115,12 +115,12 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
     // start controller
     ControllerId controllerId = ControllerId.from("controller1");
     HelixController controller = connection.createController(clusterId, controllerId);
-    controller.startAsync(); // TODO: is this really async?
+    controller.start();
 
     Thread.sleep(10000);
 
     // clean up
-    controller.stopAsync(); // TODO: is this really async?
+    controller.stop();
     connection.disconnect();
 
     Assert.assertEquals(allocated, MAX_PARTICIPANTS);
@@ -148,13 +148,13 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
       _participant = connection.createParticipant(_clusterId, _participantId);
       _participant.getStateMachineEngine().registerStateModelFactory(
           StateModelDefId.from("MasterSlave"), new TestHelixConnection.MockStateModelFactory());
-      _participant.startAsync();
+      _participant.start();
       notifyStarted();
     }
 
     @Override
     protected void doStop() {
-      _participant.stopAsync();
+      _participant.stop();
       notifyStopped();
     }
 
@@ -242,7 +242,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
     }
 
     @Override
-    public ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant ) {
+    public ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant) {
       ParticipantService participantService =
           new ParticipantService(_clusterId, _containerParticipants.get(containerId));
       participantService.startAsync();

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
index 28b1477..856707b 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
@@ -65,7 +65,7 @@ public class TestZkHelixAutoController extends ZkUnitTestBase {
       int port = 12918 + i;
       ControllerId controllerId = ControllerId.from("localhost_" + port);
       controllers[i] = connection.createAutoController(clusterId, controllerId);
-      controllers[i].startAsync();
+      controllers[i].start();
     }
 
     // check live-instance znode for localhost_12918/12919 exists
@@ -84,7 +84,7 @@ public class TestZkHelixAutoController extends ZkUnitTestBase {
     Assert.assertEquals(leader.getInstanceName(), controllers[0].getControllerId().stringify());
 
     // stop controller localhost_12918
-    controllers[0].stopAsync();
+    controllers[0].stop();
 
     // check live-instance znode for localhost_12918 is gone
     String instanceName = controllers[0].getControllerId().stringify();

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
index 0127edb..beac6aa 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
@@ -63,7 +63,7 @@ public class TestZkHelixController extends ZkUnitTestBase {
     ClusterId clusterId = ClusterId.from(clusterName);
     ControllerId controllerId = ControllerId.from("controller");
     HelixController controller = connection.createController(clusterId, controllerId);
-    controller.startAsync();
+    controller.start();
 
     // check leader znode exists
     HelixDataAccessor accessor = connection.createDataAccessor(clusterId);
@@ -73,7 +73,7 @@ public class TestZkHelixController extends ZkUnitTestBase {
     Assert.assertEquals(leader.getInstanceName(), controllerId.stringify());
 
     // stop participant
-    controller.stopAsync();
+    controller.stop();
 
     // check leader znode is gone
     Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
@@ -120,7 +120,7 @@ public class TestZkHelixController extends ZkUnitTestBase {
 
     // start controller
     HelixController controller = connection.createController(clusterId, controllerId);
-    controller.startAsync();
+    controller.start();
 
     // check live-instance znode for localhost_12918 exists
     final HelixDataAccessor accessor =

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
index 466d0b3..e566ef2 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
@@ -71,7 +71,7 @@ public class TestZkHelixParticipant extends ZkUnitTestBase {
       participants[i].getStateMachineEngine().registerStateModelFactory(
           StateModelDefId.from("MasterSlave"), new TestHelixConnection.MockStateModelFactory());
 
-      participants[i].startAsync();
+      participants[i].start();
     }
 
     // check live-instance znode for localhost_12918/12919 exist
@@ -85,7 +85,7 @@ public class TestZkHelixParticipant extends ZkUnitTestBase {
     }
 
     // stop participant localhost_12918
-    participants[0].stopAsync();
+    participants[0].stop();
 
     // check live-instance znode for localhost_12918 is gone
     Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(participants[0]

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
index 880d31c..515fdab 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
@@ -118,31 +118,31 @@ public class LogicalModelExample {
     // start the controller
     ControllerId controllerId = ControllerId.from("exampleController");
     HelixController helixController = connection.createController(clusterId, controllerId);
-    helixController.startAsync();
+    helixController.start();
 
     // start the specified participant
     HelixParticipant helixParticipant =
         connection.createParticipant(clusterId, participant.getId());
     helixParticipant.getStateMachineEngine().registerStateModelFactory(
         lockUnlock.getStateModelDefId(), new LockUnlockFactory());
-    helixParticipant.startAsync();
+    helixParticipant.start();
 
     // start another participant via auto join
     HelixParticipant autoJoinParticipant =
         connection.createParticipant(clusterId, ParticipantId.from("localhost_12120"));
     autoJoinParticipant.getStateMachineEngine().registerStateModelFactory(
         lockUnlock.getStateModelDefId(), new LockUnlockFactory());
-    autoJoinParticipant.startAsync();
+    autoJoinParticipant.start();
 
     Thread.sleep(5000);
     printExternalView(connection, clusterId, resource.getId());
 
     // stop the participants
-    helixParticipant.stopAsync();
-    autoJoinParticipant.stopAsync();
+    helixParticipant.stop();
+    autoJoinParticipant.stop();
 
     // stop the controller
-    helixController.stopAsync();
+    helixController.stop();
 
     // drop the cluster
     dropCluster(clusterId, connection);

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
index 9c80d87..81a9c56 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
@@ -3,37 +3,25 @@ package org.apache.helix.provisioning.yarn;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.helix.HelixConnection;
-import org.apache.helix.HelixParticipant;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.manager.zk.AbstractParticipantService;
 
-import com.google.common.util.concurrent.AbstractService;
-
-public class ContainerParticipant extends AbstractService {
+public class ContainerParticipant extends AbstractParticipantService {
   private static final Log LOG = LogFactory.getLog(ContainerParticipant.class);
-  private final ClusterId _clusterId;
-  private final ParticipantId _participantId;
-  private HelixParticipant _participant;
-  private HelixConnection _connection;
 
   public ContainerParticipant(HelixConnection connection, ClusterId clusterId,
       ParticipantId participantId) {
-    _connection = connection;
-    _clusterId = clusterId;
-    _participantId = participantId;
+    super(connection, clusterId, participantId);
   }
 
   @Override
-  protected void doStart() {
-    _participant = _connection.createParticipant(_clusterId, _participantId);
-    // register statemachine
-    _participant.startAsync();
-    notifyStarted();
+  public void init() {
+    // register a state model
   }
 
   @Override
-  protected void doStop() {
-    _participant.stopAsync();
-    notifyStopped();
+  public void onPreJoinCluster() {
+    // do tasks that require a connection
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
index 2356e91..c84b627 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
@@ -115,7 +115,7 @@ public class HelixYarnApplicationMasterMain {
     // start controller
     ControllerId controllerId = ControllerId.from("controller1");
     HelixController controller = connection.createController(clusterId, controllerId);
-    controller.startAsync();
+    controller.start();
 
     Thread shutdownhook = new Thread(new Runnable() {
       @Override