You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:04:56 UTC

[13/50] [abbrv] git commit: Shutdown message

Shutdown message


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c22cdd98
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c22cdd98
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c22cdd98

Branch: refs/heads/master
Commit: c22cdd98a51595d796925a682fcead7f0a90f881
Parents: b3dacb7
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Feb 21 18:13:47 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Feb 21 18:13:47 2014 -0800

----------------------------------------------------------------------
 .../stages/ContainerProvisioningStage.java      | 22 +++++-
 .../manager/zk/AbstractParticipantService.java  |  2 +-
 .../java/org/apache/helix/model/Message.java    |  3 +-
 .../org/apache/helix/tools/ClusterSetup.java    |  2 +
 .../provisioning/yarn/ParticipantLauncher.java  | 81 +++++++++++++++++++-
 5 files changed, 102 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c22cdd98/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index f258525..bc3e0c6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
@@ -45,6 +46,8 @@ import org.apache.helix.controller.provisioner.ProvisionerRef;
 import org.apache.helix.controller.provisioner.TargetProvider;
 import org.apache.helix.controller.provisioner.TargetProviderResponse;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
 import org.apache.log4j.Logger;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -123,6 +126,11 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
               LOG.info("Participant " + participantId + " is ready, marking as CONNECTED");
               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
                   ContainerState.CONNECTED);
+            } else if (!participant.isAlive() && ContainerState.HALTING.equals(containerState)) {
+              // Need to mark as connected only when the live instance is visible
+              LOG.info("Participant " + participantId + " is has been killed, marking as HALTED");
+              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+                  ContainerState.HALTED);
             }
           }
         }
@@ -262,9 +270,17 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
           FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
-              LOG.info("Container " + containerId + " stopped. Marking " + participant.getId());
-              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
-                  ContainerState.HALTED);
+              // Don't update the state here, wait for the live instance, but do send a shutdown
+              // message
+              LOG.info("Container " + containerId + " stopped for " + participant.getId());
+              if (participant.isAlive()) {
+                Message message = new Message(MessageType.SHUTDOWN, UUID.randomUUID().toString());
+                message.setTgtName(participant.getId().toString());
+                message.setTgtSessionId("*");
+                message.setMsgId(message.getId());
+                accessor.createProperty(
+                    keyBuilder.message(participant.getId().toString(), message.getId()), message);
+              }
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/c22cdd98/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
index f515092..49a7159 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
@@ -111,7 +111,7 @@ public abstract class AbstractParticipantService extends AbstractService {
    * Get an instantiated participant instance.
    * @return HelixParticipant
    */
-  protected HelixParticipant getParticipant() {
+  public HelixParticipant getParticipant() {
     return _participant;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c22cdd98/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index d465a80..dcd77d9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -60,7 +60,8 @@ public class Message extends HelixProperty {
     CONTROLLER_MSG,
     TASK_REPLY,
     NO_OP,
-    PARTICIPANT_ERROR_REPORT
+    PARTICIPANT_ERROR_REPORT,
+    SHUTDOWN
   };
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/c22cdd98/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 0247846..d97a853 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -166,6 +166,8 @@ public class ClusterSetup {
 
     addStateModelDef(clusterName, "Task",
         new StateModelDefinition(StateModelConfigGenerator.generateConfigForTaskStateModel()));
+    addStateModelDef(clusterName, "StatelessService", new StateModelDefinition(
+        StateModelConfigGenerator.generateConfigForStatelessService()));
   }
 
   public void activateCluster(String clusterName, String grandCluster, boolean enable) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c22cdd98/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
index e9b6795..1a21a71 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
@@ -7,13 +7,19 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
 import org.apache.helix.HelixConnection;
+import org.apache.helix.NotificationContext;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.manager.zk.AbstractParticipantService;
 import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
 import org.apache.log4j.Logger;
+
 /**
- * 
  * Main class that invokes the Participant Api
  */
 public class ParticipantLauncher {
@@ -31,7 +37,7 @@ public class ParticipantLauncher {
     try {
       CommandLine cliParser = new GnuParser().parse(opts, args);
       String zkAddress = cliParser.getOptionValue("zkAddress");
-      HelixConnection connection = new ZkHelixConnection(zkAddress);
+      final HelixConnection connection = new ZkHelixConnection(zkAddress);
       connection.connect();
       ClusterId clusterId = ClusterId.from(cliParser.getOptionValue("cluster"));
       ParticipantId participantId = ParticipantId.from(cliParser.getOptionValue("participantId"));
@@ -39,11 +45,27 @@ public class ParticipantLauncher {
       @SuppressWarnings("unchecked")
       Class<? extends AbstractParticipantService> clazz =
           (Class<? extends AbstractParticipantService>) Class.forName(participantClass);
-      AbstractParticipantService containerParticipant =
+      final AbstractParticipantService containerParticipant =
           clazz.getConstructor(HelixConnection.class, ClusterId.class, ParticipantId.class)
               .newInstance(connection, clusterId, participantId);
       containerParticipant.startAsync();
       containerParticipant.awaitRunning(60, TimeUnit.SECONDS);
+      containerParticipant
+          .getParticipant()
+          .getMessagingService()
+          .registerMessageHandlerFactory(MessageType.SHUTDOWN.toString(),
+              new ShutdownMessageHandlerFactory(containerParticipant, connection));
+      Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+        @Override
+        public void run() {
+          LOG.info("Received a shutdown signal. Stopping participant");
+          containerParticipant.stopAsync();
+          containerParticipant.awaitTerminated();
+          connection.disconnect();
+        }
+      }) {
+
+      });
       Thread.currentThread().join();
     } catch (Exception e) {
       e.printStackTrace();
@@ -57,4 +79,57 @@ public class ParticipantLauncher {
     }
 
   }
+
+  public static class ShutdownMessageHandlerFactory implements MessageHandlerFactory {
+    private final AbstractParticipantService _service;
+    private final HelixConnection _connection;
+
+    public ShutdownMessageHandlerFactory(AbstractParticipantService service,
+        HelixConnection connection) {
+      _service = service;
+      _connection = connection;
+    }
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext context) {
+      return new ShutdownMessageHandler(_service, _connection, message, context);
+    }
+
+    @Override
+    public String getMessageType() {
+      return MessageType.SHUTDOWN.toString();
+    }
+
+    @Override
+    public void reset() {
+    }
+
+  }
+
+  public static class ShutdownMessageHandler extends MessageHandler {
+    private final AbstractParticipantService _service;
+    private final HelixConnection _connection;
+
+    public ShutdownMessageHandler(AbstractParticipantService service, HelixConnection connection,
+        Message message, NotificationContext context) {
+      super(message, context);
+      _service = service;
+      _connection = connection;
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() throws InterruptedException {
+      LOG.info("Received a shutdown message. Trying to shut down.");
+      _service.stopAsync();
+      _service.awaitTerminated();
+      _connection.disconnect();
+      System.exit(1);
+      return null;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
+    }
+
+  }
 }