You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/07/14 03:33:40 UTC

incubator-twill git commit: TWILL-116 Add TwillRunnable instances lifecycle management.

Repository: incubator-twill
Updated Branches:
  refs/heads/master d181b7cef -> d4a1508ee


TWILL-116 Add TwillRunnable instances lifecycle management.

Add support to restart certain or all instances of runnables in a Twill application.
See proposed design attached to the JIRA: https://issues.apache.org/jira/browse/TWILL-116

Summary of changes:
1. Add new APIs to TwillController to restart all or certain instances of runnables.
2. Support message handler in ApplicationMasterService for restart instances.
3. Modify RunningContainers to launch new container using same instance id to support restart for a runnable.
4. Add validation for instance ids to be restarted.

This closes #52 on GitHub.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/d4a1508e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/d4a1508e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/d4a1508e

Branch: refs/heads/master
Commit: d4a1508eeacbebdff5263b1ecad182775f27162a
Parents: d181b7c
Author: hsaputra <hs...@apache.org>
Authored: Fri Jun 26 22:19:24 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Jul 13 18:33:30 2015 -0700

----------------------------------------------------------------------
 .../org/apache/twill/api/TwillController.java   | 28 +++++++
 .../twill/internal/AbstractTwillController.java | 81 +++++++++++++++++++-
 .../internal/AbstractZKServiceController.java   |  1 +
 .../org/apache/twill/internal/Constants.java    |  5 ++
 .../org/apache/twill/internal/ZKMessages.java   |  2 +-
 .../apache/twill/internal/state/Message.java    |  3 +-
 .../twill/internal/state/SystemMessages.java    | 27 ++++++-
 .../yarn/Hadoop21YarnContainerInfo.java         |  2 +-
 .../org/apache/twill/internal/ServiceMain.java  |  2 +-
 .../appmaster/ApplicationMasterService.java     | 79 +++++++++++++++++++
 .../appmaster/RunnableContainerRequest.java     |  1 +
 .../internal/appmaster/RunningContainers.java   | 65 ++++++++++------
 .../apache/twill/yarn/EchoServerTestRun.java    | 70 +++++++++++++++++
 13 files changed, 338 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-api/src/main/java/org/apache/twill/api/TwillController.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillController.java b/twill-api/src/main/java/org/apache/twill/api/TwillController.java
index b84f817..08206f5 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillController.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillController.java
@@ -21,6 +21,8 @@ import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.discovery.Discoverable;
 import org.apache.twill.discovery.ServiceDiscovered;
 
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Future;
 import javax.annotation.Nullable;
 
@@ -61,4 +63,30 @@ public interface TwillController extends ServiceController {
    */
   @Nullable
   ResourceReport getResourceReport();
+
+  /**
+   * Restart all instances of a particular {@link TwillRunnable}.
+   *
+   * @param runnable The name of the runnable to restart.
+   * @return A {@link Future} that will be completed when the restart operation has been done.
+   */
+  Future<String> restartAllInstances(String runnable);
+
+  /**
+   * Restart instances of some {@link TwillRunnable}.
+   *
+   * @param runnableToInstanceIds A map of runnable ID to list of instance IDs to be restarted.
+   * @return A {@link Future} that will be completed when the restart operation has been done.
+   */
+  Future<Set<String>> restartInstances(Map<String, ? extends Set<Integer>> runnableToInstanceIds);
+
+  /**
+   * Restart instances of some {@link TwillRunnable}.
+   *
+   * @param runnable The name of the runnable to restart.
+   * @param instanceId The main instance id to be restarted.
+   * @param moreInstanceIds The optional instance ids.
+   * @return A {@link Future} that will be completed when the restart operation has been done.
+   */
+  Future<String> restartInstances(String runnable, int instanceId, int... moreInstanceIds);
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index bf20616..41a044b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -18,12 +18,22 @@
 package org.apache.twill.internal;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
 import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.api.logging.LogThrowable;
@@ -35,6 +45,7 @@ import org.apache.twill.internal.json.LogEntryDecoder;
 import org.apache.twill.internal.json.LogThrowableCodec;
 import org.apache.twill.internal.json.StackTraceElementCodec;
 import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
+import org.apache.twill.internal.state.Message;
 import org.apache.twill.internal.state.SystemMessages;
 import org.apache.twill.kafka.client.FetchedMessage;
 import org.apache.twill.kafka.client.KafkaClientService;
@@ -44,8 +55,11 @@ import org.apache.twill.zookeeper.ZKClients;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -55,6 +69,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 public abstract class AbstractTwillController extends AbstractZKServiceController implements TwillController {
 
   private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillController.class);
+  private static final Gson GSON = new Gson();
 
   private final Queue<LogHandler> logHandlers;
   private final KafkaClientService kafkaClient;
@@ -63,7 +78,7 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
 
   public AbstractTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers) {
     super(runId, zkClient);
-    this.logHandlers = new ConcurrentLinkedQueue<LogHandler>();
+    this.logHandlers = new ConcurrentLinkedQueue<>();
     this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
     this.discoveryServiceClient = new ZKDiscoveryService(zkClient);
     Iterables.addAll(this.logHandlers, logHandlers);
@@ -109,6 +124,70 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
     return sendMessage(SystemMessages.setInstances(runnable, newCount), newCount);
   }
 
+  @Override
+  public final ListenableFuture<String> restartAllInstances(String runnableName) {
+    Command updateStateCommand = Command.Builder.of(Constants.RESTART_ALL_RUNNABLE_INSTANCES).
+      build();
+    Message message = SystemMessages.updateRunnableInstances(updateStateCommand, runnableName);
+    return sendMessage(message, updateStateCommand.getCommand());
+  }
+
+  @Override
+  public final ListenableFuture<Set<String>> restartInstances(Map<String,
+      ? extends Set<Integer>> runnableToInstanceIds) {
+    Map<String, String> runnableToStringInstanceIds =
+      Maps.transformEntries(runnableToInstanceIds, new Maps.EntryTransformer<String, Set<Integer>, String>() {
+        @Override
+        public String transformEntry(String runnableName, Set<Integer> instanceIds) {
+          validateInstanceIds(runnableName, instanceIds);
+          return GSON.toJson(instanceIds, new TypeToken<Set<Integer>>() {}.getType());
+        }
+      });
+    Command updateStateCommand = Command.Builder.of(Constants.RESTART_RUNNABLES_INSTANCES)
+      .addOptions(runnableToStringInstanceIds)
+      .build();
+    Message message = SystemMessages.updateRunnablesInstances(updateStateCommand);
+
+    return sendMessage(message, runnableToInstanceIds.keySet());
+  }
+
+  @Override
+  public ListenableFuture<String> restartInstances(final String runnable, int instanceId, int... moreInstanceIds) {
+    Set<Integer> instanceIds = Sets.newLinkedHashSet();
+    instanceIds.add(instanceId);
+    for (int id : moreInstanceIds) {
+      instanceIds.add(id);
+    }
+
+    return Futures.transform(restartInstances(ImmutableMap.of(runnable, instanceIds)),
+                             new Function<Set<String>, String>() {
+      public String apply(Set<String> input) {
+        return runnable;
+      }
+    });
+  }
+
+  private void validateInstanceIds(String runnable, Set<Integer> instanceIds) {
+    ResourceReport resourceReport = getResourceReport();
+    if (resourceReport == null) {
+      throw new IllegalStateException("Unable to get resource report since application has not started.");
+    }
+    Collection<TwillRunResources> runnableResources = resourceReport.getRunnableResources(runnable);
+    if (runnableResources == null) {
+      throw new RuntimeException("Unable to verify run resources for runnable " + runnable);
+    }
+    Set<Integer> existingInstanceIds = Sets.newHashSet();
+    for (TwillRunResources twillRunResources : runnableResources) {
+      existingInstanceIds.add(twillRunResources.getInstanceId());
+    }
+    LOG.info("Existing instance ids: {}", existingInstanceIds);
+    for (int instanceId : instanceIds) {
+      if (!existingInstanceIds.contains(instanceId)) {
+        throw new IllegalArgumentException("Unable to find instance id " + instanceId + " for " + runnable);
+      }
+    }
+  }
+
   private static final class LogMessageCallback implements KafkaConsumer.MessageCallback {
 
     private static final Gson GSON = new GsonBuilder()

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
index 6d95009..0cf92ea 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.twill.api.Command;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.ServiceController;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index 64b029d..e3a2194 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -47,6 +47,11 @@ public final class Constants {
 
   public static final String CLASSPATH = "classpath";
   public static final String APPLICATION_CLASSPATH = "application-classpath";
+
+  /** For runnables instance lifecycle ZK path */
+  public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances";
+  public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
+
   /**
    * Constants for names of internal files that are shared between client, AM and containers.
    */

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java b/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java
index b7905d9..cfcc7e7 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java
@@ -28,7 +28,7 @@ import org.apache.twill.zookeeper.ZKOperations;
 import org.apache.zookeeper.CreateMode;
 
 /**
- *
+ * Helper class to send messages to remote instances using Apache Zookeeper watch mechanism.
  */
 public final class ZKMessages {
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/Message.java b/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
index c6944fd..1c758c1 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
@@ -38,7 +38,8 @@ public interface Message {
   enum Scope {
     APPLICATION,
     ALL_RUNNABLE,
-    RUNNABLE
+    RUNNABLE,
+    RUNNABLES
   }
 
   Type getType();

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java b/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
index 73683bd..4c4bb8b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
@@ -17,9 +17,10 @@
  */
 package org.apache.twill.internal.state;
 
-import com.google.common.base.Preconditions;
 import org.apache.twill.api.Command;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Collection of predefined system messages.
  */
@@ -43,6 +44,30 @@ public final class SystemMessages {
                              Command.Builder.of("instances").addOption("count", Integer.toString(instances)).build());
   }
 
+  /**
+   * Helper method to get System {@link Message} for update instances for a runnable.
+   *
+   * @param updateCommand The {@link Command} to be added to the message.
+   * @param runnableName The name of the runnable to be restarted.
+   * @return An instance of System {@link Message} to restart runnable instances.
+   */
+  public static Message updateRunnableInstances(Command updateCommand, String runnableName) {
+    Preconditions.checkNotNull(updateCommand);
+    Preconditions.checkNotNull(runnableName);
+    return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLE, runnableName, updateCommand);
+  }
+
+  /**
+   * Helper method to get System {@link Message} for restarting certain instances from certain runnables.
+   *
+   * @param updateCommand The {@link Command} to be added to the message.
+   * @return An instance of System {@link Message} to restart runnables' instances.
+   */
+  public static Message updateRunnablesInstances(Command updateCommand) {
+    Preconditions.checkNotNull(updateCommand);
+    return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLES, null, updateCommand);
+  }
+
   private SystemMessages() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
index 86903c1..8571933 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
@@ -24,7 +24,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 
 /**
- *
+ * Implementation of {@link YarnContainerInfo} for Apache Hadoop 2.1.0 or above.
  */
 public final class Hadoop21YarnContainerInfo implements YarnContainerInfo {
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index d7ef72b..f7bea24 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -228,7 +228,7 @@ public abstract class ServiceMain {
   }
 
   /**
-   * Override to return the right log level for the service
+   * Override to return the right log level for the service.
    *
    * @param logger the {@link Logger} instance of the service context.
    * @return String of log level based on {@code slf4j} log levels.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 5c89a79..f76cd0b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -20,14 +20,17 @@ package org.apache.twill.internal.appmaster;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
+import com.google.common.collect.DiscreteDomains;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
+import com.google.common.collect.Ranges;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import com.google.common.io.InputSupplier;
@@ -35,6 +38,7 @@ import com.google.common.reflect.TypeToken;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
@@ -92,6 +96,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 
 /**
  * The class that acts as {@code ApplicationMaster} for Twill applications.
@@ -99,6 +104,7 @@ import java.util.concurrent.TimeUnit;
 public final class ApplicationMasterService extends AbstractYarnTwillService implements Supplier<ResourceReport> {
 
   private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterService.class);
+  private static final Gson GSON = new Gson();
 
   // Copied from org.apache.hadoop.yarn.security.AMRMTokenIdentifier.KIND_NAME since it's missing in Hadoop-2.0
   private static final Text AMRM_TOKEN_KIND_NAME = new Text("YARN_AM_RM_TOKEN");
@@ -288,6 +294,10 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       return result;
     }
 
+    if (handleRestartRunnablesInstances(message, completion)) {
+      return result;
+    }
+
     // Replicate messages to all runnables
     if (message.getScope() == Message.Scope.ALL_RUNNABLE) {
       runningContainers.sendToAll(message, completion);
@@ -787,4 +797,73 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
     capability.setMemory(resourceSpec.getMemorySize());
     return capability;
   }
+
+  /**
+   * Attempt to restart some instances from a runnable or some runnables.
+   * @return {@code true} if the message requests restarting some instances and {@code false} otherwise.
+   */
+  private boolean handleRestartRunnablesInstances(final Message message, final Runnable completion) {
+    LOG.debug("Check if it should process a restart runnable instances.");
+
+    if (message.getType() != Message.Type.SYSTEM) {
+      return false;
+    }
+
+    Message.Scope messageScope = message.getScope();
+    if (messageScope != Message.Scope.RUNNABLE && messageScope != Message.Scope.RUNNABLES) {
+      return false;
+    }
+
+    Command requestCommand = message.getCommand();
+    if (!Constants.RESTART_ALL_RUNNABLE_INSTANCES.equals(requestCommand.getCommand()) &&
+      !Constants.RESTART_RUNNABLES_INSTANCES.equals(requestCommand.getCommand())) {
+      return false;
+    }
+
+    LOG.debug("Processing restart runnable instances message {}.", message);
+
+    if (!Strings.isNullOrEmpty(message.getRunnableName()) && message.getScope() == Message.Scope.RUNNABLE) {
+      // ... for a runnable ...
+      String runnableName = message.getRunnableName();
+      LOG.debug("Start restarting all runnable {} instances.", runnableName);
+      restartRunnableInstances(runnableName, null);
+    } else {
+      // ... or maybe some runnables
+      for (Map.Entry<String, String> option : requestCommand.getOptions().entrySet()) {
+        String runnableName = option.getKey();
+        Set<Integer> restartedInstanceIds = GSON.fromJson(option.getValue(),
+                                                           new TypeToken<Set<Integer>>() {}.getType());
+
+        LOG.debug("Start restarting runnable {} instances {}", runnableName, restartedInstanceIds);
+        restartRunnableInstances(runnableName, restartedInstanceIds);
+      }
+    }
+
+    completion.run();
+    return true;
+  }
+
+  /**
+   * Helper method to restart instances of runnables.
+   */
+  private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds) {
+    LOG.debug("Begin restart runnable {} instances.", runnableName);
+
+    Set<Integer> instancesToRemove = instanceIds;
+    if (instancesToRemove == null) {
+      instancesToRemove = Ranges.closedOpen(0, runningContainers.count(runnableName)).asSet(DiscreteDomains.integers());
+    }
+
+    for (int instanceId : instancesToRemove) {
+      LOG.debug("Remove instance {} for runnable {}", instanceId, runnableName);
+      try {
+        runningContainers.removeById(runnableName, instanceId);
+      } catch (Exception ex) {
+        // could be thrown if the container already stopped.
+        LOG.info("Exception thrown when stopping instance {} probably already stopped.", instanceId);
+      }
+    }
+    LOG.info("Restarting instances {} for runnable {}", instancesToRemove, runnableName);
+    runnableContainerRequests.add(createRunnableContainerRequest(runnableName, instancesToRemove.size()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
index 2105629..f065380 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
@@ -26,6 +26,7 @@ import org.apache.twill.api.TwillSpecification;
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index 4a56229..d957768 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -121,6 +121,9 @@ final class RunningContainers {
     }
   }
 
+  /**
+   * Start a container for a runnable.
+   */
   void start(String runnableName, ContainerInfo containerInfo, TwillContainerLauncher launcher) {
     containerLock.lock();
     try {
@@ -186,32 +189,47 @@ final class RunningContainers {
         LOG.warn("No running container found for {}", runnableName);
         return;
       }
+      removeInstanceById(runnableName, maxInstanceId);
+    } finally {
+      containerLock.unlock();
+    }
+  }
 
-      String lastContainerId = null;
-      TwillContainerController lastController = null;
+  /**
+   * Stop and remove a container for a runnable on an id.
+   */
+  void removeById(String runnableName, int instanceId) {
+    containerLock.lock();
+    try {
+      removeInstanceById(runnableName, instanceId);
+    } finally {
+      containerLock.unlock();
+    }
+  }
 
-      // Find the controller with the maxInstanceId
-      for (Map.Entry<String, TwillContainerController> entry : containers.row(runnableName).entrySet()) {
-        if (getInstanceId(entry.getValue().getRunId()) == maxInstanceId) {
-          lastContainerId = entry.getKey();
-          lastController = entry.getValue();
-          break;
-        }
+  private void removeInstanceById(String runnableName, int instanceId) {
+    String containerId = null;
+    TwillContainerController controller = null;
+
+    // Find the controller with particular instance id.
+    for (Map.Entry<String, TwillContainerController> entry : containers.row(runnableName).entrySet()) {
+      if (getInstanceId(entry.getValue().getRunId()) == instanceId) {
+        containerId = entry.getKey();
+        controller = entry.getValue();
+        break;
       }
+    }
 
-      Preconditions.checkState(lastContainerId != null,
-                               "No container found for {} with instanceId = {}", runnableName, maxInstanceId);
+    Preconditions.checkState(containerId != null,
+                             "No container found for {} with instanceId = {}", runnableName, instanceId);
 
-      LOG.info("Stopping service: {} {}", runnableName, lastController.getRunId());
-      lastController.stopAndWait();
-      containers.remove(runnableName, lastContainerId);
-      removeContainerInfo(lastContainerId);
-      removeInstanceId(runnableName, maxInstanceId);
-      resourceReport.removeRunnableResources(runnableName, lastContainerId);
-      containerChange.signalAll();
-    } finally {
-      containerLock.unlock();
-    }
+    LOG.info("Stopping service: {} {}", runnableName, controller.getRunId());
+    controller.stopAndWait();
+    containers.remove(runnableName, containerId);
+    removeContainerInfo(containerId);
+    removeInstanceId(runnableName, instanceId);
+    resourceReport.removeRunnableResources(runnableName, containerId);
+    containerChange.signalAll();
   }
 
   /**
@@ -446,6 +464,9 @@ final class RunningContainers {
     return instanceId;
   }
 
+  /**
+   * Remove instance id for a given runnable.
+   */
   private void removeInstanceId(String runnableName, int instanceId) {
     BitSet instances = runnableInstances.get(runnableName);
     if (instances == null) {
@@ -469,7 +490,7 @@ final class RunningContainers {
   }
 
   /**
-   * Returns nnumber of running instances for the given runnable.
+   * Returns number of running instances for the given runnable.
    */
   private int getRunningInstances(String runableName) {
     BitSet instances = runnableInstances.get(runableName);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
index d635af1..0a8414e 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -18,9 +18,14 @@
 package org.apache.twill.yarn;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Maps;
 import com.google.common.io.LineReader;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.ResourceSpecification;
 import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
 import org.apache.twill.api.TwillRunner;
 import org.apache.twill.api.TwillRunnerService;
 import org.apache.twill.api.logging.PrinterLogHandler;
@@ -37,10 +42,13 @@ import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.net.Socket;
 import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 
 /**
  * Using echo server to test various behavior of YarnTwillService.
@@ -107,6 +115,22 @@ public final class EchoServerTestRun extends BaseYarnTest {
     controller.changeInstances("EchoServer", 2);
     Assert.assertTrue(waitForSize(echoServices, 2, 120));
 
+    // Test restart on instances for runnable
+    Map<Integer, String> instanceIdToContainerId = Maps.newHashMap();
+    ResourceReport report = waitForAfterRestartResourceReport(controller, "EchoServer", 30L, TimeUnit.SECONDS, 2, null);
+    Assert.assertTrue(report != null);
+    Collection<TwillRunResources> runResources = report.getRunnableResources("EchoServer");
+    for (TwillRunResources twillRunResources : runResources) {
+      instanceIdToContainerId.put(twillRunResources.getInstanceId(), twillRunResources.getContainerId());
+    }
+
+    controller.restartAllInstances("EchoServer");
+    Assert.assertTrue(waitForSize(echoServices, 2, 120));
+
+    report = waitForAfterRestartResourceReport(controller, "EchoServer", 30L, TimeUnit.SECONDS, 2,
+                                               instanceIdToContainerId);
+    Assert.assertTrue(report != null);
+
     // Make sure still only one app is running
     Iterable<TwillRunner.LiveInfo> apps = runner.lookupLive();
     Assert.assertTrue(waitForSize(apps, 1, 120));
@@ -132,4 +156,50 @@ public final class EchoServerTestRun extends BaseYarnTest {
     // Sleep a bit before exiting.
     TimeUnit.SECONDS.sleep(2);
   }
+
+  /**
+   *  Need helper method here to wait for getting resource report because {@link TwillController#getResourceReport()}
+   *  could return null if the application has not fully started.
+   *
+   *  This method helps validate restart scenario.
+   *
+   *  To avoid long sleep if instanceIdToContainerId is passed, then compare the container ids to ones before.
+   *  Otherwise just return the valid resource report.
+   */
+  @Nullable
+  private ResourceReport waitForAfterRestartResourceReport(TwillController controller, String runnable, long timeout,
+                                                           TimeUnit timeoutUnit, int numOfResources,
+                                                           @Nullable Map<Integer, String> instanceIdToContainerId) {
+    Stopwatch stopwatch = new Stopwatch();
+    stopwatch.start();
+    do {
+      ResourceReport report = controller.getResourceReport();
+      if (report == null || report.getRunnableResources(runnable) == null) {
+        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      } else if (report.getRunnableResources(runnable) == null ||
+          report.getRunnableResources(runnable).size() != numOfResources) {
+        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      } else {
+        if (instanceIdToContainerId == null) {
+          return report;
+        }
+        Collection<TwillRunResources> runResources = report.getRunnableResources(runnable);
+        boolean isSameContainer = false;
+        for (TwillRunResources twillRunResources : runResources) {
+          int instanceId = twillRunResources.getInstanceId();
+          if (twillRunResources.getContainerId().equals(instanceIdToContainerId.get(instanceId))) {
+            // found same container id lets wait again.
+            isSameContainer = true;
+            break;
+          }
+        }
+        if (!isSameContainer) {
+          LOG.error("Unable to get different container ids for restart.");
+          return report;
+        }
+        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      }
+    } while (stopwatch.elapsedTime(timeoutUnit) < timeout);
+    return null;
+  }
 }