You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/07/30 21:57:07 UTC

incubator-twill git commit: Potential race condition when restart all is called for a Twill runnable.

Repository: incubator-twill
Updated Branches:
  refs/heads/master d835cafc9 -> 0c20804c2


Potential race condition when restart all is called for a Twill runnable.

If restart all instances is requested for a TwillRunnable then there could be race condition to check
provisioned and container requests that could exit the TwillApplication.

This PR containes changes:
-) Change the container requests to be ConcurrentLinkedQueue since it is accessed by multiple threads.
-) Add new volatile flag in RunnableContainerRequest to indicate whether it is ready to be provisioned.
-) Move up adding container requests for restart before removing.
-) Remove execution of restart to thread in the add instances executor.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/0c20804c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/0c20804c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/0c20804c

Branch: refs/heads/master
Commit: 0c20804c2001591dec590461a26d3b594601f4a9
Parents: d835caf
Author: hsaputra <hs...@apache.org>
Authored: Wed Jul 29 16:34:12 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Jul 30 12:56:58 2015 -0700

----------------------------------------------------------------------
 .../appmaster/ApplicationMasterService.java     | 82 ++++++++++++++------
 .../appmaster/RunnableContainerRequest.java     | 16 ++++
 2 files changed, 76 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/0c20804c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index cbf013b..818db05 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -93,6 +93,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -364,7 +365,15 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
 
       // If nothing is in provisioning, and no pending request, move to next one
       while (provisioning.isEmpty() && currentRequest == null && !runnableContainerRequests.isEmpty()) {
-        currentRequest = runnableContainerRequests.peek().takeRequest();
+        RunnableContainerRequest runnableContainerRequest = runnableContainerRequests.peek();
+        if (!runnableContainerRequest.isReadyToBeProvisioned()) {
+          // take it out from queue and put it back at the end for second chance.
+          runnableContainerRequest = runnableContainerRequests.poll();
+          runnableContainerRequests.add(runnableContainerRequest);
+
+          continue;
+        }
+        currentRequest = runnableContainerRequest.takeRequest();
         if (currentRequest == null) {
           // All different types of resource request from current order is done, move to next one
           // TODO: Need to handle order type as well
@@ -524,7 +533,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
 
   private Queue<RunnableContainerRequest> initContainerRequests() {
     // Orderly stores container requests.
-    Queue<RunnableContainerRequest> requests = Lists.newLinkedList();
+    Queue<RunnableContainerRequest> requests = new ConcurrentLinkedQueue<>();
     // For each order in the twillSpec, create container request for runnables, depending on Placement policy.
     for (TwillSpecification.Order order : twillSpec.getOrders()) {
       Set<String> distributedRunnables = placementPolicyManager.getDistributedRunnables(order.getNames());
@@ -752,6 +761,12 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
 
   private RunnableContainerRequest createRunnableContainerRequest(final String runnableName,
                                                                   final int numberOfInstances) {
+    return createRunnableContainerRequest(runnableName, numberOfInstances, true);
+  }
+
+  private RunnableContainerRequest createRunnableContainerRequest(final String runnableName,
+                                                                  final int numberOfInstances,
+                                                                  final boolean isProvisioned) {
     // Find the current order of the given runnable in order to create a RunnableContainerRequest.
     TwillSpecification.Order order = Iterables.find(twillSpec.getOrders(), new Predicate<TwillSpecification.Order>() {
       @Override
@@ -775,7 +790,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
       addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
     }
-    return new RunnableContainerRequest(order.getType(), requestsMap);
+    return new RunnableContainerRequest(order.getType(), requestsMap, isProvisioned);
   }
 
   private Runnable getMessageCompletion(final String messageId, final SettableFuture<String> future) {
@@ -826,7 +841,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       // ... for a runnable ...
       String runnableName = message.getRunnableName();
       LOG.debug("Start restarting all runnable {} instances.", runnableName);
-      restartRunnableInstances(runnableName, null);
+      restartRunnableInstances(runnableName, null, completion);
     } else {
       // ... or maybe some runnables
       for (Map.Entry<String, String> option : requestCommand.getOptions().entrySet()) {
@@ -835,35 +850,58 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
                                                           new TypeToken<Set<Integer>>() {}.getType());
 
         LOG.debug("Start restarting runnable {} instances {}", runnableName, restartedInstanceIds);
-        restartRunnableInstances(runnableName, restartedInstanceIds);
+        restartRunnableInstances(runnableName, restartedInstanceIds, completion);
       }
     }
 
-    completion.run();
     return true;
   }
 
   /**
    * Helper method to restart instances of runnables.
    */
-  private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds) {
-    LOG.debug("Begin restart runnable {} instances.", runnableName);
+  private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds,
+                                        final Runnable completion) {
+    Runnable restartInstancesRunnable = createRestartInstancesRunnable(runnableName, instanceIds, completion);
+    instanceChangeExecutor.execute(restartInstancesRunnable);
+  }
 
-    Set<Integer> instancesToRemove = instanceIds;
-    if (instancesToRemove == null) {
-      instancesToRemove = Ranges.closedOpen(0, runningContainers.count(runnableName)).asSet(DiscreteDomains.integers());
-    }
+  /**
+   * Creates a Runnable for execution of restart instances request.
+   */
+  private Runnable createRestartInstancesRunnable(final String runnableName, @Nullable final Set<Integer> instanceIds,
+                                                  final Runnable completion) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        LOG.debug("Begin restart runnable {} instances.", runnableName);
 
-    for (int instanceId : instancesToRemove) {
-      LOG.debug("Remove instance {} for runnable {}", instanceId, runnableName);
-      try {
-        runningContainers.removeById(runnableName, instanceId);
-      } catch (Exception ex) {
-        // could be thrown if the container already stopped.
-        LOG.info("Exception thrown when stopping instance {} probably already stopped.", instanceId);
+        Set<Integer> instancesToRemove = instanceIds;
+        if (instancesToRemove == null) {
+          instancesToRemove =
+            Ranges.closedOpen(0, runningContainers.count(runnableName)).asSet(DiscreteDomains.integers());
+        }
+
+        LOG.info("Restarting instances {} for runnable {}", instancesToRemove, runnableName);
+        RunnableContainerRequest containerRequest =
+          createRunnableContainerRequest(runnableName, instancesToRemove.size(), false);
+        runnableContainerRequests.add(containerRequest);
+
+        for (int instanceId : instancesToRemove) {
+          LOG.debug("Remove instance {} for runnable {}", instanceId, runnableName);
+          try {
+            runningContainers.removeById(runnableName, instanceId);
+          } catch (Exception ex) {
+            // could be thrown if the container already stopped.
+            LOG.info("Exception thrown when stopping instance {} probably already stopped.", instanceId);
+          }
+        }
+
+        // set the container request to be ready
+        containerRequest.setReadyToBeProvisioned(true);
+
+        completion.run();
       }
-    }
-    LOG.info("Restarting instances {} for runnable {}", instancesToRemove, runnableName);
-    runnableContainerRequests.add(createRunnableContainerRequest(runnableName, instancesToRemove.size()));
+    };
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/0c20804c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
index 2105629..e001121 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
@@ -34,17 +34,33 @@ import java.util.Map;
 final class RunnableContainerRequest {
   private final TwillSpecification.Order.Type orderType;
   private final Iterator<Map.Entry<AllocationSpecification, Collection<RuntimeSpecification>>> requests;
+  private volatile boolean isReadyToBeProvisioned;
 
   RunnableContainerRequest(TwillSpecification.Order.Type orderType,
                            Map<AllocationSpecification, Collection<RuntimeSpecification>> requests) {
+    this(orderType, requests, true);
+  }
+
+  RunnableContainerRequest(TwillSpecification.Order.Type orderType,
+                           Map<AllocationSpecification, Collection<RuntimeSpecification>> requests,
+                           boolean isReadyToBeProvisioned) {
     this.orderType = orderType;
     this.requests = requests.entrySet().iterator();
+    this.isReadyToBeProvisioned = isReadyToBeProvisioned;
   }
 
   TwillSpecification.Order.Type getOrderType() {
     return orderType;
   }
 
+  public boolean isReadyToBeProvisioned() {
+    return isReadyToBeProvisioned;
+  }
+
+  public void setReadyToBeProvisioned(boolean isProvisioned) {
+    this.isReadyToBeProvisioned = isProvisioned;
+  }
+
   /**
    * Remove a resource request and return it.
    * @return The {@link Resource} and {@link Collection} of {@link RuntimeSpecification} or