You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/16 05:59:32 UTC
git commit: Create an abstract participant service
Updated Branches:
refs/heads/helix-provisioning 852be0ccb -> 8b1763585
Create an abstract participant service
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8b176358
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8b176358
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8b176358
Branch: refs/heads/helix-provisioning
Commit: 8b176358583ca39fd2bba5725e423d9b9baba662
Parents: 852be0c
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Jan 15 20:59:20 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jan 15 20:59:20 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/HelixService.java | 8 +-
.../stages/ContainerProvisioningStage.java | 2 +
.../manager/zk/AbstractParticipantService.java | 106 +++++++++++++++++++
.../helix/manager/zk/ZkHelixAutoController.java | 4 +-
.../helix/manager/zk/ZkHelixController.java | 4 +-
.../helix/manager/zk/ZkHelixParticipant.java | 4 +-
.../helix/integration/TestHelixConnection.java | 8 +-
.../integration/TestLocalContainerProvider.java | 10 +-
.../manager/zk/TestZkHelixAutoController.java | 4 +-
.../helix/manager/zk/TestZkHelixController.java | 6 +-
.../manager/zk/TestZkHelixParticipant.java | 4 +-
.../helix/examples/LogicalModelExample.java | 12 +--
.../provisioning/yarn/ContainerParticipant.java | 26 ++---
.../yarn/HelixYarnApplicationMasterMain.java | 2 +-
14 files changed, 148 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/HelixService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixService.java b/helix-core/src/main/java/org/apache/helix/HelixService.java
index a1ce0ec..40e9bae 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixService.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixService.java
@@ -24,12 +24,12 @@ package org.apache.helix;
*/
public interface HelixService {
/**
- * start helix service async
+ * start helix service
*/
- void startAsync();
+ void start();
/**
- * stop helix service async
+ * stop helix service
*/
- void stopAsync();
+ void stop();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index a17251d..fdd6d4e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -96,6 +96,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
final Cluster cluster = event.getAttribute("ClusterDataCache");
final Collection<Participant> participants = cluster.getParticipantMap().values();
+ // TODO: if a process died, we need to mark it as stopped or something
+
// Participants registered in helix
// Give those participants to targetprovider
// Provide the response that contains, new containerspecs, containers to be released,
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
new file mode 100644
index 0000000..2e5eafa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
@@ -0,0 +1,106 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+
+import com.google.common.util.concurrent.AbstractService;
+
+/**
+ * A modeling of a helix participant as a self-contained service.
+ */
+public abstract class AbstractParticipantService extends AbstractService {
+ private final ClusterId _clusterId;
+ private final ParticipantId _participantId;
+ private HelixParticipant _participant;
+ private HelixConnection _connection;
+
+ /**
+ * Initialize the service.
+ * @param connection A live Helix connection
+ * @param clusterId the cluster to join
+ * @param participantId a unique identifier that this participant will join with
+ */
+ public AbstractParticipantService(HelixConnection connection, ClusterId clusterId,
+ ParticipantId participantId) {
+ _connection = connection;
+ _clusterId = clusterId;
+ _participantId = participantId;
+ }
+
+ @Override
+ protected void doStart() {
+ _participant = _connection.createParticipant(_clusterId, _participantId);
+
+ // add a preconnect callback
+ _participant.addPreConnectCallback(new PreConnectCallback() {
+ @Override
+ public void onPreConnect() {
+ onPreJoinCluster();
+ }
+ });
+
+ // register state machine and other initialization
+ init();
+
+ // start and notify
+ if (!_connection.isConnected()) {
+ _connection.connect();
+ }
+ _participant.start();
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ _participant.stop();
+ notifyStopped();
+ }
+
+ /**
+ * Initialize the participant. For example, here is where you can register a state machine: <br/>
+ * <br/>
+ * <code>
+ * HelixParticipant participant = getParticipant();
+ * participant.getStateMachineEngine().registerStateModelFactory(stateModelDefId, factory);
+ * </code><br/>
+ * <br/>
+ * This code is called prior to starting the participant.
+ */
+ public abstract void init();
+
+ /**
+ * Complete any tasks that require a live Helix connection. This function is called before the
+ * participant declares itself ready to receive state transitions.
+ */
+ public abstract void onPreJoinCluster();
+
+ /**
+ * Get an instantiated participant instance.
+ * @return HelixParticipant
+ */
+ public HelixParticipant getParticipant() {
+ return _participant;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
index 136d47e..1d4b225 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
@@ -78,13 +78,13 @@ public class ZkHelixAutoController implements HelixAutoController {
}
@Override
- public void startAsync() {
+ public void start() {
_connection.addConnectionStateListener(this);
onConnected();
}
@Override
- public void stopAsync() {
+ public void stop() {
_connection.removeConnectionStateListener(this);
onDisconnecting();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
index 3091a90..51bb746 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -96,13 +96,13 @@ public class ZkHelixController implements HelixController {
}
@Override
- public void startAsync() {
+ public void start() {
_connection.addConnectionStateListener(this);
onConnected();
}
@Override
- public void stopAsync() {
+ public void stop() {
_connection.removeConnectionStateListener(this);
onDisconnecting();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
index f8f7a46..c8748d1 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
@@ -440,13 +440,13 @@ public class ZkHelixParticipant implements HelixParticipant {
}
@Override
- public void startAsync() {
+ public void start() {
_connection.addConnectionStateListener(this);
onConnected();
}
@Override
- public void stopAsync() {
+ public void stop() {
_connection.removeConnectionStateListener(this);
onDisconnecting();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index b415393..9b772f2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -134,14 +134,14 @@ public class TestHelixConnection extends ZkUnitTestBase {
// start controller
HelixController controller = connection.createController(clusterId, controllerId);
- controller.startAsync();
+ controller.start();
// start participant
HelixParticipant participant = connection.createParticipant(clusterId, participantId);
participant.getStateMachineEngine().registerStateModelFactory(
StateModelDefId.from("MasterSlave"), new MockStateModelFactory());
- participant.startAsync();
+ participant.start();
// verify
final HelixDataAccessor accessor = connection.createDataAccessor(clusterId);
@@ -164,8 +164,8 @@ public class TestHelixConnection extends ZkUnitTestBase {
Assert.assertTrue(success);
// clean up
- controller.stopAsync();
- participant.stopAsync();
+ controller.stop();
+ participant.stop();
connection.disconnect();
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
index 2f1d397..7df03f3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
@@ -115,12 +115,12 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
// start controller
ControllerId controllerId = ControllerId.from("controller1");
HelixController controller = connection.createController(clusterId, controllerId);
- controller.startAsync(); // TODO: is this really async?
+ controller.start();
Thread.sleep(10000);
// clean up
- controller.stopAsync(); // TODO: is this really async?
+ controller.stop();
connection.disconnect();
Assert.assertEquals(allocated, MAX_PARTICIPANTS);
@@ -148,13 +148,13 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
_participant = connection.createParticipant(_clusterId, _participantId);
_participant.getStateMachineEngine().registerStateModelFactory(
StateModelDefId.from("MasterSlave"), new TestHelixConnection.MockStateModelFactory());
- _participant.startAsync();
+ _participant.start();
notifyStarted();
}
@Override
protected void doStop() {
- _participant.stopAsync();
+ _participant.stop();
notifyStopped();
}
@@ -242,7 +242,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
}
@Override
- public ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant ) {
+ public ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant) {
ParticipantService participantService =
new ParticipantService(_clusterId, _containerParticipants.get(containerId));
participantService.startAsync();
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
index 28b1477..856707b 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
@@ -65,7 +65,7 @@ public class TestZkHelixAutoController extends ZkUnitTestBase {
int port = 12918 + i;
ControllerId controllerId = ControllerId.from("localhost_" + port);
controllers[i] = connection.createAutoController(clusterId, controllerId);
- controllers[i].startAsync();
+ controllers[i].start();
}
// check live-instance znode for localhost_12918/12919 exists
@@ -84,7 +84,7 @@ public class TestZkHelixAutoController extends ZkUnitTestBase {
Assert.assertEquals(leader.getInstanceName(), controllers[0].getControllerId().stringify());
// stop controller localhost_12918
- controllers[0].stopAsync();
+ controllers[0].stop();
// check live-instance znode for localhost_12918 is gone
String instanceName = controllers[0].getControllerId().stringify();
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
index 0127edb..beac6aa 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
@@ -63,7 +63,7 @@ public class TestZkHelixController extends ZkUnitTestBase {
ClusterId clusterId = ClusterId.from(clusterName);
ControllerId controllerId = ControllerId.from("controller");
HelixController controller = connection.createController(clusterId, controllerId);
- controller.startAsync();
+ controller.start();
// check leader znode exists
HelixDataAccessor accessor = connection.createDataAccessor(clusterId);
@@ -73,7 +73,7 @@ public class TestZkHelixController extends ZkUnitTestBase {
Assert.assertEquals(leader.getInstanceName(), controllerId.stringify());
// stop participant
- controller.stopAsync();
+ controller.stop();
// check leader znode is gone
Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
@@ -120,7 +120,7 @@ public class TestZkHelixController extends ZkUnitTestBase {
// start controller
HelixController controller = connection.createController(clusterId, controllerId);
- controller.startAsync();
+ controller.start();
// check live-instance znode for localhost_12918 exists
final HelixDataAccessor accessor =
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
index 466d0b3..e566ef2 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
@@ -71,7 +71,7 @@ public class TestZkHelixParticipant extends ZkUnitTestBase {
participants[i].getStateMachineEngine().registerStateModelFactory(
StateModelDefId.from("MasterSlave"), new TestHelixConnection.MockStateModelFactory());
- participants[i].startAsync();
+ participants[i].start();
}
// check live-instance znode for localhost_12918/12919 exist
@@ -85,7 +85,7 @@ public class TestZkHelixParticipant extends ZkUnitTestBase {
}
// stop participant localhost_12918
- participants[0].stopAsync();
+ participants[0].stop();
// check live-instance znode for localhost_12918 is gone
Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(participants[0]
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
index 880d31c..515fdab 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
@@ -118,31 +118,31 @@ public class LogicalModelExample {
// start the controller
ControllerId controllerId = ControllerId.from("exampleController");
HelixController helixController = connection.createController(clusterId, controllerId);
- helixController.startAsync();
+ helixController.start();
// start the specified participant
HelixParticipant helixParticipant =
connection.createParticipant(clusterId, participant.getId());
helixParticipant.getStateMachineEngine().registerStateModelFactory(
lockUnlock.getStateModelDefId(), new LockUnlockFactory());
- helixParticipant.startAsync();
+ helixParticipant.start();
// start another participant via auto join
HelixParticipant autoJoinParticipant =
connection.createParticipant(clusterId, ParticipantId.from("localhost_12120"));
autoJoinParticipant.getStateMachineEngine().registerStateModelFactory(
lockUnlock.getStateModelDefId(), new LockUnlockFactory());
- autoJoinParticipant.startAsync();
+ autoJoinParticipant.start();
Thread.sleep(5000);
printExternalView(connection, clusterId, resource.getId());
// stop the participants
- helixParticipant.stopAsync();
- autoJoinParticipant.stopAsync();
+ helixParticipant.stop();
+ autoJoinParticipant.stop();
// stop the controller
- helixController.stopAsync();
+ helixController.stop();
// drop the cluster
dropCluster(clusterId, connection);
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
index 9c80d87..81a9c56 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
@@ -3,37 +3,25 @@ package org.apache.helix.provisioning.yarn;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.helix.HelixConnection;
-import org.apache.helix.HelixParticipant;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.manager.zk.AbstractParticipantService;
-import com.google.common.util.concurrent.AbstractService;
-
-public class ContainerParticipant extends AbstractService {
+public class ContainerParticipant extends AbstractParticipantService {
private static final Log LOG = LogFactory.getLog(ContainerParticipant.class);
- private final ClusterId _clusterId;
- private final ParticipantId _participantId;
- private HelixParticipant _participant;
- private HelixConnection _connection;
public ContainerParticipant(HelixConnection connection, ClusterId clusterId,
ParticipantId participantId) {
- _connection = connection;
- _clusterId = clusterId;
- _participantId = participantId;
+ super(connection, clusterId, participantId);
}
@Override
- protected void doStart() {
- _participant = _connection.createParticipant(_clusterId, _participantId);
- // register statemachine
- _participant.startAsync();
- notifyStarted();
+ public void init() {
+ // register a state model
}
@Override
- protected void doStop() {
- _participant.stopAsync();
- notifyStopped();
+ public void onPreJoinCluster() {
+ // do tasks that require a connection
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
index 2356e91..c84b627 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
@@ -115,7 +115,7 @@ public class HelixYarnApplicationMasterMain {
// start controller
ControllerId controllerId = ControllerId.from("controller1");
HelixController controller = connection.createController(clusterId, controllerId);
- controller.startAsync();
+ controller.start();
Thread shutdownhook = new Thread(new Runnable() {
@Override