You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/07/30 21:57:07 UTC
incubator-twill git commit: Potential race condition when restart all
is called for a Twill runnable.
Repository: incubator-twill
Updated Branches:
refs/heads/master d835cafc9 -> 0c20804c2
Potential race condition when restart all is called for a Twill runnable.
If restart all instances is requested for a TwillRunnable then there could be race condition to check
provisioned and container requests that could exit the TwillApplication.
This PR containes changes:
-) Change the container requests to be ConcurrentLinkedQueue since it is accessed by multiple threads.
-) Add new volatile flag in RunnableContainerRequest to indicate whether it is ready to be provisioned.
-) Move up adding container requests for restart before removing.
-) Remove execution of restart to thread in the add instances executor.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/0c20804c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/0c20804c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/0c20804c
Branch: refs/heads/master
Commit: 0c20804c2001591dec590461a26d3b594601f4a9
Parents: d835caf
Author: hsaputra <hs...@apache.org>
Authored: Wed Jul 29 16:34:12 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Jul 30 12:56:58 2015 -0700
----------------------------------------------------------------------
.../appmaster/ApplicationMasterService.java | 82 ++++++++++++++------
.../appmaster/RunnableContainerRequest.java | 16 ++++
2 files changed, 76 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/0c20804c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index cbf013b..818db05 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -93,6 +93,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -364,7 +365,15 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
// If nothing is in provisioning, and no pending request, move to next one
while (provisioning.isEmpty() && currentRequest == null && !runnableContainerRequests.isEmpty()) {
- currentRequest = runnableContainerRequests.peek().takeRequest();
+ RunnableContainerRequest runnableContainerRequest = runnableContainerRequests.peek();
+ if (!runnableContainerRequest.isReadyToBeProvisioned()) {
+ // take it out from queue and put it back at the end for second chance.
+ runnableContainerRequest = runnableContainerRequests.poll();
+ runnableContainerRequests.add(runnableContainerRequest);
+
+ continue;
+ }
+ currentRequest = runnableContainerRequest.takeRequest();
if (currentRequest == null) {
// All different types of resource request from current order is done, move to next one
// TODO: Need to handle order type as well
@@ -524,7 +533,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
private Queue<RunnableContainerRequest> initContainerRequests() {
// Orderly stores container requests.
- Queue<RunnableContainerRequest> requests = Lists.newLinkedList();
+ Queue<RunnableContainerRequest> requests = new ConcurrentLinkedQueue<>();
// For each order in the twillSpec, create container request for runnables, depending on Placement policy.
for (TwillSpecification.Order order : twillSpec.getOrders()) {
Set<String> distributedRunnables = placementPolicyManager.getDistributedRunnables(order.getNames());
@@ -752,6 +761,12 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
private RunnableContainerRequest createRunnableContainerRequest(final String runnableName,
final int numberOfInstances) {
+ return createRunnableContainerRequest(runnableName, numberOfInstances, true);
+ }
+
+ private RunnableContainerRequest createRunnableContainerRequest(final String runnableName,
+ final int numberOfInstances,
+ final boolean isProvisioned) {
// Find the current order of the given runnable in order to create a RunnableContainerRequest.
TwillSpecification.Order order = Iterables.find(twillSpec.getOrders(), new Predicate<TwillSpecification.Order>() {
@Override
@@ -775,7 +790,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
}
- return new RunnableContainerRequest(order.getType(), requestsMap);
+ return new RunnableContainerRequest(order.getType(), requestsMap, isProvisioned);
}
private Runnable getMessageCompletion(final String messageId, final SettableFuture<String> future) {
@@ -826,7 +841,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
// ... for a runnable ...
String runnableName = message.getRunnableName();
LOG.debug("Start restarting all runnable {} instances.", runnableName);
- restartRunnableInstances(runnableName, null);
+ restartRunnableInstances(runnableName, null, completion);
} else {
// ... or maybe some runnables
for (Map.Entry<String, String> option : requestCommand.getOptions().entrySet()) {
@@ -835,35 +850,58 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
new TypeToken<Set<Integer>>() {}.getType());
LOG.debug("Start restarting runnable {} instances {}", runnableName, restartedInstanceIds);
- restartRunnableInstances(runnableName, restartedInstanceIds);
+ restartRunnableInstances(runnableName, restartedInstanceIds, completion);
}
}
- completion.run();
return true;
}
/**
* Helper method to restart instances of runnables.
*/
- private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds) {
- LOG.debug("Begin restart runnable {} instances.", runnableName);
+ private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds,
+ final Runnable completion) {
+ Runnable restartInstancesRunnable = createRestartInstancesRunnable(runnableName, instanceIds, completion);
+ instanceChangeExecutor.execute(restartInstancesRunnable);
+ }
- Set<Integer> instancesToRemove = instanceIds;
- if (instancesToRemove == null) {
- instancesToRemove = Ranges.closedOpen(0, runningContainers.count(runnableName)).asSet(DiscreteDomains.integers());
- }
+ /**
+ * Creates a Runnable for execution of restart instances request.
+ */
+ private Runnable createRestartInstancesRunnable(final String runnableName, @Nullable final Set<Integer> instanceIds,
+ final Runnable completion) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ LOG.debug("Begin restart runnable {} instances.", runnableName);
- for (int instanceId : instancesToRemove) {
- LOG.debug("Remove instance {} for runnable {}", instanceId, runnableName);
- try {
- runningContainers.removeById(runnableName, instanceId);
- } catch (Exception ex) {
- // could be thrown if the container already stopped.
- LOG.info("Exception thrown when stopping instance {} probably already stopped.", instanceId);
+ Set<Integer> instancesToRemove = instanceIds;
+ if (instancesToRemove == null) {
+ instancesToRemove =
+ Ranges.closedOpen(0, runningContainers.count(runnableName)).asSet(DiscreteDomains.integers());
+ }
+
+ LOG.info("Restarting instances {} for runnable {}", instancesToRemove, runnableName);
+ RunnableContainerRequest containerRequest =
+ createRunnableContainerRequest(runnableName, instancesToRemove.size(), false);
+ runnableContainerRequests.add(containerRequest);
+
+ for (int instanceId : instancesToRemove) {
+ LOG.debug("Remove instance {} for runnable {}", instanceId, runnableName);
+ try {
+ runningContainers.removeById(runnableName, instanceId);
+ } catch (Exception ex) {
+ // could be thrown if the container already stopped.
+ LOG.info("Exception thrown when stopping instance {} probably already stopped.", instanceId);
+ }
+ }
+
+ // set the container request to be ready
+ containerRequest.setReadyToBeProvisioned(true);
+
+ completion.run();
}
- }
- LOG.info("Restarting instances {} for runnable {}", instancesToRemove, runnableName);
- runnableContainerRequests.add(createRunnableContainerRequest(runnableName, instancesToRemove.size()));
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/0c20804c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
index 2105629..e001121 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
@@ -34,17 +34,33 @@ import java.util.Map;
final class RunnableContainerRequest {
private final TwillSpecification.Order.Type orderType;
private final Iterator<Map.Entry<AllocationSpecification, Collection<RuntimeSpecification>>> requests;
+ private volatile boolean isReadyToBeProvisioned;
RunnableContainerRequest(TwillSpecification.Order.Type orderType,
Map<AllocationSpecification, Collection<RuntimeSpecification>> requests) {
+ this(orderType, requests, true);
+ }
+
+ RunnableContainerRequest(TwillSpecification.Order.Type orderType,
+ Map<AllocationSpecification, Collection<RuntimeSpecification>> requests,
+ boolean isReadyToBeProvisioned) {
this.orderType = orderType;
this.requests = requests.entrySet().iterator();
+ this.isReadyToBeProvisioned = isReadyToBeProvisioned;
}
TwillSpecification.Order.Type getOrderType() {
return orderType;
}
+ public boolean isReadyToBeProvisioned() {
+ return isReadyToBeProvisioned;
+ }
+
+ public void setReadyToBeProvisioned(boolean isProvisioned) {
+ this.isReadyToBeProvisioned = isProvisioned;
+ }
+
/**
* Remove a resource request and return it.
* @return The {@link Resource} and {@link Collection} of {@link RuntimeSpecification} or