You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/02/22 03:13:57 UTC
git commit: Shutdown message
Repository: helix
Updated Branches:
refs/heads/helix-provisioning b3dacb7ab -> c22cdd98a
Shutdown message
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c22cdd98
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c22cdd98
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c22cdd98
Branch: refs/heads/helix-provisioning
Commit: c22cdd98a51595d796925a682fcead7f0a90f881
Parents: b3dacb7
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Feb 21 18:13:47 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Feb 21 18:13:47 2014 -0800
----------------------------------------------------------------------
.../stages/ContainerProvisioningStage.java | 22 +++++-
.../manager/zk/AbstractParticipantService.java | 2 +-
.../java/org/apache/helix/model/Message.java | 3 +-
.../org/apache/helix/tools/ClusterSetup.java | 2 +
.../provisioning/yarn/ParticipantLauncher.java | 81 +++++++++++++++++++-
5 files changed, 102 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c22cdd98/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index f258525..bc3e0c6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
@@ -45,6 +46,8 @@ import org.apache.helix.controller.provisioner.ProvisionerRef;
import org.apache.helix.controller.provisioner.TargetProvider;
import org.apache.helix.controller.provisioner.TargetProviderResponse;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
import org.apache.log4j.Logger;
import com.google.common.util.concurrent.FutureCallback;
@@ -123,6 +126,11 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
LOG.info("Participant " + participantId + " is ready, marking as CONNECTED");
updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
ContainerState.CONNECTED);
+ } else if (!participant.isAlive() && ContainerState.HALTING.equals(containerState)) {
+ // Need to mark as connected only when the live instance is visible
+ LOG.info("Participant " + participantId + " is has been killed, marking as HALTED");
+ updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+ ContainerState.HALTED);
}
}
}
@@ -262,9 +270,17 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
- LOG.info("Container " + containerId + " stopped. Marking " + participant.getId());
- updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
- ContainerState.HALTED);
+ // Don't update the state here, wait for the live instance, but do send a shutdown
+ // message
+ LOG.info("Container " + containerId + " stopped for " + participant.getId());
+ if (participant.isAlive()) {
+ Message message = new Message(MessageType.SHUTDOWN, UUID.randomUUID().toString());
+ message.setTgtName(participant.getId().toString());
+ message.setTgtSessionId("*");
+ message.setMsgId(message.getId());
+ accessor.createProperty(
+ keyBuilder.message(participant.getId().toString(), message.getId()), message);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/c22cdd98/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
index f515092..49a7159 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
@@ -111,7 +111,7 @@ public abstract class AbstractParticipantService extends AbstractService {
* Get an instantiated participant instance.
* @return HelixParticipant
*/
- protected HelixParticipant getParticipant() {
+ public HelixParticipant getParticipant() {
return _participant;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c22cdd98/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index d465a80..dcd77d9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -60,7 +60,8 @@ public class Message extends HelixProperty {
CONTROLLER_MSG,
TASK_REPLY,
NO_OP,
- PARTICIPANT_ERROR_REPORT
+ PARTICIPANT_ERROR_REPORT,
+ SHUTDOWN
};
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/c22cdd98/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 0247846..d97a853 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -166,6 +166,8 @@ public class ClusterSetup {
addStateModelDef(clusterName, "Task",
new StateModelDefinition(StateModelConfigGenerator.generateConfigForTaskStateModel()));
+ addStateModelDef(clusterName, "StatelessService", new StateModelDefinition(
+ StateModelConfigGenerator.generateConfigForStatelessService()));
}
public void activateCluster(String clusterName, String grandCluster, boolean enable) {
http://git-wip-us.apache.org/repos/asf/helix/blob/c22cdd98/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
index e9b6795..1a21a71 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
@@ -7,13 +7,19 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.helix.HelixConnection;
+import org.apache.helix.NotificationContext;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.manager.zk.AbstractParticipantService;
import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
import org.apache.log4j.Logger;
+
/**
- *
* Main class that invokes the Participant Api
*/
public class ParticipantLauncher {
@@ -31,7 +37,7 @@ public class ParticipantLauncher {
try {
CommandLine cliParser = new GnuParser().parse(opts, args);
String zkAddress = cliParser.getOptionValue("zkAddress");
- HelixConnection connection = new ZkHelixConnection(zkAddress);
+ final HelixConnection connection = new ZkHelixConnection(zkAddress);
connection.connect();
ClusterId clusterId = ClusterId.from(cliParser.getOptionValue("cluster"));
ParticipantId participantId = ParticipantId.from(cliParser.getOptionValue("participantId"));
@@ -39,11 +45,27 @@ public class ParticipantLauncher {
@SuppressWarnings("unchecked")
Class<? extends AbstractParticipantService> clazz =
(Class<? extends AbstractParticipantService>) Class.forName(participantClass);
- AbstractParticipantService containerParticipant =
+ final AbstractParticipantService containerParticipant =
clazz.getConstructor(HelixConnection.class, ClusterId.class, ParticipantId.class)
.newInstance(connection, clusterId, participantId);
containerParticipant.startAsync();
containerParticipant.awaitRunning(60, TimeUnit.SECONDS);
+ containerParticipant
+ .getParticipant()
+ .getMessagingService()
+ .registerMessageHandlerFactory(MessageType.SHUTDOWN.toString(),
+ new ShutdownMessageHandlerFactory(containerParticipant, connection));
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Received a shutdown signal. Stopping participant");
+ containerParticipant.stopAsync();
+ containerParticipant.awaitTerminated();
+ connection.disconnect();
+ }
+ }) {
+
+ });
Thread.currentThread().join();
} catch (Exception e) {
e.printStackTrace();
@@ -57,4 +79,57 @@ public class ParticipantLauncher {
}
}
+
+ public static class ShutdownMessageHandlerFactory implements MessageHandlerFactory {
+ private final AbstractParticipantService _service;
+ private final HelixConnection _connection;
+
+ public ShutdownMessageHandlerFactory(AbstractParticipantService service,
+ HelixConnection connection) {
+ _service = service;
+ _connection = connection;
+ }
+
+ @Override
+ public MessageHandler createHandler(Message message, NotificationContext context) {
+ return new ShutdownMessageHandler(_service, _connection, message, context);
+ }
+
+ @Override
+ public String getMessageType() {
+ return MessageType.SHUTDOWN.toString();
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ }
+
+ public static class ShutdownMessageHandler extends MessageHandler {
+ private final AbstractParticipantService _service;
+ private final HelixConnection _connection;
+
+ public ShutdownMessageHandler(AbstractParticipantService service, HelixConnection connection,
+ Message message, NotificationContext context) {
+ super(message, context);
+ _service = service;
+ _connection = connection;
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException {
+ LOG.info("Received a shutdown message. Trying to shut down.");
+ _service.stopAsync();
+ _service.awaitTerminated();
+ _connection.disconnect();
+ System.exit(1);
+ return null;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
+ }
+
+ }
}