You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/04/22 08:06:45 UTC

[48/50] [abbrv] git commit: Workaround for YARN-314 by not mixing new container request with remove container request (request with container count = 0).

Workaround for YARN-314 by not mixing new container request with remove container request (request with container count = 0).

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/2c3cf396
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/2c3cf396
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/2c3cf396

Branch: refs/heads/site
Commit: 2c3cf39682415420a89f0360c0abe9a42fdc4abe
Parents: d6504eb
Author: Terence Yim <te...@continuuity.com>
Authored: Wed Apr 16 17:06:25 2014 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Apr 17 00:26:25 2014 -0700

----------------------------------------------------------------------
 .../internal/yarn/Hadoop20YarnAMClient.java     | 139 ++++-------
 .../internal/yarn/Hadoop21YarnAMClient.java     | 100 ++------
 .../appmaster/ApplicationMasterService.java     |   6 +-
 .../internal/yarn/AbstractYarnAMClient.java     | 229 +++++++++++++++++++
 .../twill/internal/yarn/YarnAMClient.java       |   4 +-
 .../apache/twill/yarn/ContainerSizeTestRun.java | 119 ++++++++++
 .../apache/twill/yarn/LogHandlerTestRun.java    |   3 +
 7 files changed, 420 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
index 68d073d..9b66f67 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -19,23 +19,18 @@ package org.apache.twill.internal.yarn;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.twill.internal.ProcessLauncher;
 import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
 import org.apache.twill.internal.yarn.ports.AMRMClient;
 import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
@@ -43,15 +38,13 @@ import org.apache.twill.internal.yarn.ports.AllocationResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
-import java.net.URL;
 import java.util.List;
-import java.util.UUID;
+import javax.annotation.Nullable;
 
 /**
  *
  */
-public final class Hadoop20YarnAMClient extends AbstractIdleService implements YarnAMClient {
+public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.ContainerRequest> {
 
   private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAMClient.class);
   private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
@@ -65,21 +58,13 @@ public final class Hadoop20YarnAMClient extends AbstractIdleService implements Y
     };
   }
 
-  private final ContainerId containerId;
-  private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
   private final AMRMClient amrmClient;
   private final YarnNMClient nmClient;
-  private InetSocketAddress trackerAddr;
-  private URL trackerUrl;
   private Resource maxCapability;
   private Resource minCapability;
 
   public Hadoop20YarnAMClient(Configuration conf) {
-    String masterContainerId = System.getenv().get(ApplicationConstants.AM_CONTAINER_ID_ENV);
-    Preconditions.checkArgument(masterContainerId != null,
-                                "Missing %s from environment", ApplicationConstants.AM_CONTAINER_ID_ENV);
-    this.containerId = ConverterUtils.toContainerId(masterContainerId);
-    this.containerRequests = ArrayListMultimap.create();
+    super(ApplicationConstants.AM_CONTAINER_ID_ENV);
 
     this.amrmClient = new AMRMClientImpl(containerId.getApplicationAttemptId());
     this.amrmClient.init(conf);
@@ -110,107 +95,67 @@ public final class Hadoop20YarnAMClient extends AbstractIdleService implements Y
   }
 
   @Override
-  public ContainerId getContainerId() {
-    return containerId;
-  }
-
-  @Override
   public String getHost() {
     return System.getenv().get(ApplicationConstants.NM_HOST_ENV);
   }
 
   @Override
-  public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
-    this.trackerAddr = trackerAddr;
-    this.trackerUrl = trackerUrl;
-  }
-
-  @Override
-  public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
-    AllocationResponse response = amrmClient.allocate(progress);
-    List<ProcessLauncher<YarnContainerInfo>> launchers
-      = Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
-
-    for (Container container : response.getAllocatedContainers()) {
-      launchers.add(new RunnableProcessLauncher(new Hadoop20YarnContainerInfo(container), nmClient));
+  protected Resource adjustCapability(Resource resource) {
+    int cores = YarnUtils.getVirtualCores(resource);
+    int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
+                                YarnUtils.getVirtualCores(minCapability));
+    // Try and set the virtual cores, which older versions of YARN don't support this.
+    if (cores != updatedCores && YarnUtils.setVirtualCores(resource, updatedCores)) {
+      LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
     }
 
-    if (!launchers.isEmpty()) {
-      handler.acquired(launchers);
-
-      // If no process has been launched through the given launcher, return the container.
-      for (ProcessLauncher<YarnContainerInfo> l : launchers) {
-        // This cast always works.
-        RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
-        if (!launcher.isLaunched()) {
-          Container container = launcher.getContainerInfo().getContainer();
-          LOG.info("Nothing to run in container, releasing it: {}", container);
-          amrmClient.releaseAssignedContainer(container.getId());
-        }
-      }
-    }
+    int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
+    int minMemory = minCapability.getMemory();
+    updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
 
-    List<YarnContainerStatus> completed = ImmutableList.copyOf(
-      Iterables.transform(response.getCompletedContainersStatuses(), STATUS_TRANSFORM));
-    if (!completed.isEmpty()) {
-      handler.completed(completed);
+    if (resource.getMemory() != updatedMemory) {
+      resource.setMemory(updatedMemory);
+      LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
     }
+
+    return resource;
   }
 
   @Override
-  public ContainerRequestBuilder addContainerRequest(Resource capability) {
-    return addContainerRequest(capability, 1);
+  protected AMRMClient.ContainerRequest createContainerRequest(Priority priority, Resource capability,
+                                                               @Nullable String[] hosts, @Nullable String[] racks) {
+    return new AMRMClient.ContainerRequest(capability, hosts, racks, priority, 1);
   }
 
   @Override
-  public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
-    return new ContainerRequestBuilder(adjustCapability(capability), count) {
-      @Override
-      public String apply() {
-        synchronized (Hadoop20YarnAMClient.this) {
-          String id = UUID.randomUUID().toString();
-
-          String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
-          String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
-
-          for (int i = 0; i < count; i++) {
-            AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks,
-                                                                                  priority, 1);
-            containerRequests.put(id, request);
-            amrmClient.addContainerRequest(request);
-          }
-
-          return id;
-        }
-      }
-    };
+  protected void addContainerRequest(AMRMClient.ContainerRequest request) {
+    amrmClient.addContainerRequest(request);
   }
 
   @Override
-  public synchronized void completeContainerRequest(String id) {
-    for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
-      amrmClient.removeContainerRequest(request);
-    }
+  protected void removeContainerRequest(AMRMClient.ContainerRequest request) {
+    amrmClient.removeContainerRequest(request);
   }
 
-  private Resource adjustCapability(Resource resource) {
-    int cores = YarnUtils.getVirtualCores(resource);
-    int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
-                                YarnUtils.getVirtualCores(minCapability));
-    // Try and set the virtual cores, which older versions of YARN don't support this.
-    if (cores != updatedCores && YarnUtils.setVirtualCores(resource, updatedCores)) {
-      LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
+  @Override
+  protected AllocateResult doAllocate(float progress) throws Exception {
+    AllocationResponse response = amrmClient.allocate(progress);
+    List<RunnableProcessLauncher> launchers
+      = Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
+
+    for (Container container : response.getAllocatedContainers()) {
+      launchers.add(new RunnableProcessLauncher(new Hadoop20YarnContainerInfo(container), nmClient));
     }
 
-    int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
-    int minMemory = minCapability.getMemory();
-    updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
+    List<YarnContainerStatus> completed = ImmutableList.copyOf(
+      Iterables.transform(response.getCompletedContainersStatuses(), STATUS_TRANSFORM));
 
-    if (resource.getMemory() != updatedMemory) {
-      resource.setMemory(updatedMemory);
-      LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
-    }
+    return new AllocateResult(launchers, completed);
+  }
 
-    return resource;
+  @Override
+  protected void releaseAssignedContainer(YarnContainerInfo containerInfo) {
+    Container container = containerInfo.getContainer();
+    amrmClient.releaseAssignedContainer(container.getId());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
index 0ebc0f5..78a5135 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -19,37 +19,30 @@ package org.apache.twill.internal.yarn;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.twill.internal.ProcessLauncher;
 import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
-import java.net.URL;
 import java.util.List;
-import java.util.UUID;
+import javax.annotation.Nullable;
 
 /**
  *
  */
-public final class Hadoop21YarnAMClient extends AbstractIdleService implements YarnAMClient {
+public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.ContainerRequest> {
 
   private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAMClient.class);
 
@@ -64,20 +57,12 @@ public final class Hadoop21YarnAMClient extends AbstractIdleService implements Y
     };
   }
 
-  private final ContainerId containerId;
-  private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
   private final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
   private final Hadoop21YarnNMClient nmClient;
-  private InetSocketAddress trackerAddr;
-  private URL trackerUrl;
   private Resource maxCapability;
 
   public Hadoop21YarnAMClient(Configuration conf) {
-    String masterContainerId = System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.name());
-    Preconditions.checkArgument(masterContainerId != null,
-                                "Missing %s from environment", ApplicationConstants.Environment.CONTAINER_ID.name());
-    this.containerId = ConverterUtils.toContainerId(masterContainerId);
-    this.containerRequests = ArrayListMultimap.create();
+    super(ApplicationConstants.Environment.CONTAINER_ID.name());
 
     this.amrmClient = AMRMClient.createAMRMClient();
     this.amrmClient.init(conf);
@@ -105,89 +90,50 @@ public final class Hadoop21YarnAMClient extends AbstractIdleService implements Y
   }
 
   @Override
-  public ContainerId getContainerId() {
-    return containerId;
+  public String getHost() {
+    return System.getenv().get(ApplicationConstants.Environment.NM_HOST.name());
   }
 
   @Override
-  public String getHost() {
-    return System.getenv().get(ApplicationConstants.Environment.NM_HOST.name());
+  protected AMRMClient.ContainerRequest createContainerRequest(Priority priority, Resource capability,
+                                                               @Nullable String[] hosts, @Nullable String[] racks) {
+    return new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
   }
 
   @Override
-  public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
-    this.trackerAddr = trackerAddr;
-    this.trackerUrl = trackerUrl;
+  protected void addContainerRequest(AMRMClient.ContainerRequest request) {
+    amrmClient.addContainerRequest(request);
   }
 
   @Override
-  public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+  protected void removeContainerRequest(AMRMClient.ContainerRequest request) {
+    amrmClient.removeContainerRequest(request);
+  }
+
+  @Override
+  protected AllocateResult doAllocate(float progress) throws Exception {
     AllocateResponse allocateResponse = amrmClient.allocate(progress);
-    List<ProcessLauncher<YarnContainerInfo>> launchers
+    List<RunnableProcessLauncher> launchers
       = Lists.newArrayListWithCapacity(allocateResponse.getAllocatedContainers().size());
 
     for (Container container : allocateResponse.getAllocatedContainers()) {
       launchers.add(new RunnableProcessLauncher(new Hadoop21YarnContainerInfo(container), nmClient));
     }
 
-    if (!launchers.isEmpty()) {
-      handler.acquired(launchers);
-
-      // If no process has been launched through the given launcher, return the container.
-      for (ProcessLauncher<YarnContainerInfo> l : launchers) {
-        // This cast always works.
-        RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
-        if (!launcher.isLaunched()) {
-          Container container = launcher.getContainerInfo().getContainer();
-          LOG.info("Nothing to run in container, releasing it: {}", container);
-          amrmClient.releaseAssignedContainer(container.getId());
-        }
-      }
-    }
-
     List<YarnContainerStatus> completed = ImmutableList.copyOf(
       Iterables.transform(allocateResponse.getCompletedContainersStatuses(), STATUS_TRANSFORM));
-    if (!completed.isEmpty()) {
-      handler.completed(completed);
-    }
-  }
 
-  @Override
-  public ContainerRequestBuilder addContainerRequest(Resource capability) {
-    return addContainerRequest(capability, 1);
+    return new AllocateResult(launchers, completed);
   }
 
   @Override
-  public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
-    return new ContainerRequestBuilder(adjustCapability(capability), count) {
-      @Override
-      public String apply() {
-        synchronized (Hadoop21YarnAMClient.this) {
-          String id = UUID.randomUUID().toString();
-
-          String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
-          String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
-
-          for (int i = 0; i < count; i++) {
-            AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
-            containerRequests.put(id, request);
-            amrmClient.addContainerRequest(request);
-          }
-
-          return id;
-        }
-      }
-    };
+  protected void releaseAssignedContainer(YarnContainerInfo containerInfo) {
+    Container container = containerInfo.getContainer();
+    amrmClient.releaseAssignedContainer(container.getId());
   }
 
   @Override
-  public synchronized void completeContainerRequest(String id) {
-    for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
-      amrmClient.removeContainerRequest(request);
-    }
-  }
-
-  private Resource adjustCapability(Resource resource) {
+  protected Resource adjustCapability(Resource resource) {
     int cores = resource.getVirtualCores();
     int updatedCores = Math.min(resource.getVirtualCores(), maxCapability.getVirtualCores());
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index f1ad20e..2b7f049 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -291,7 +291,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
     final Set<String> ids = Sets.newHashSet(runningContainers.getContainerIds());
     YarnAMClient.AllocateHandler handler = new YarnAMClient.AllocateHandler() {
       @Override
-      public void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers) {
+      public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launchers) {
         // no-op
       }
 
@@ -352,7 +352,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
 
     YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() {
       @Override
-      public void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers) {
+      public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launchers) {
         launchRunnable(launchers, provisioning);
       }
 
@@ -525,7 +525,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
   /**
    * Launches runnables in the provisioned containers.
    */
-  private void launchRunnable(List<ProcessLauncher<YarnContainerInfo>> launchers,
+  private void launchRunnable(List<? extends ProcessLauncher<YarnContainerInfo>> launchers,
                               Queue<ProvisionRequest> provisioning) {
     for (ProcessLauncher<YarnContainerInfo> processLauncher : launchers) {
       LOG.info("Got container {}", processLauncher.getContainerInfo().getId());

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
new file mode 100644
index 0000000..9718150
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+/**
+ * Abstract base for implementing YarnAMClient for different versions of hadoop.
+ *
+ * @param <T> Type of container request.
+ */
+public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implements YarnAMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnAMClient.class);
+
+  // Map from a unique ID to inflight requests
+  private final Multimap<String, T> containerRequests;
+
+  // List of requests pending to send through allocate call
+  private final List<T> requests;
+  // List of requests pending to remove through allocate call
+  private final List<T> removes;
+
+  protected final ContainerId containerId;
+  protected InetSocketAddress trackerAddr;
+  protected URL trackerUrl;
+
+  /**
+   * Constructs an instance of AMClient.
+   *
+   * @param containerIdEnvName Name of the environment variable that contains value of the AM container ID.
+   */
+  protected AbstractYarnAMClient(String containerIdEnvName) {
+    String masterContainerId = System.getenv().get(containerIdEnvName);
+    Preconditions.checkArgument(masterContainerId != null,
+                                "Missing %s from environment", containerIdEnvName);
+    this.containerId = ConverterUtils.toContainerId(masterContainerId);
+    this.containerRequests = ArrayListMultimap.create();
+    this.requests = Lists.newLinkedList();
+    this.removes = Lists.newLinkedList();
+  }
+
+
+  @Override
+  public final ContainerId getContainerId() {
+    return containerId;
+  }
+
+  @Override
+  public final void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
+    this.trackerAddr = trackerAddr;
+    this.trackerUrl = trackerUrl;
+  }
+
+  @Override
+  public final synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+    // In one allocate cycle, either only do new container request or removal of requests.
+    // This is a workaround for YARN-314.
+    // When remove a container request, AMRMClient will send a container request with size = 0
+    // With bug YARN-314, if we mix the allocate call with new container request of the same priority,
+    // in some cases the RM would not see the new request (based on sorting of resource capability),
+    // but rather only see the one with size = 0.
+    if (removes.isEmpty()) {
+      for (T request : requests) {
+        addContainerRequest(request);
+      }
+      requests.clear();
+    } else {
+      for (T request : removes) {
+        removeContainerRequest(request);
+      }
+      removes.clear();
+    }
+
+    AllocateResult allocateResponse = doAllocate(progress);
+    List<RunnableProcessLauncher> launchers = allocateResponse.getLaunchers();
+
+    if (!launchers.isEmpty()) {
+      handler.acquired(launchers);
+
+      // If no process has been launched through the given launcher, return the container.
+      for (ProcessLauncher<YarnContainerInfo> l : launchers) {
+        // This cast always works.
+        RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
+        if (!launcher.isLaunched()) {
+          YarnContainerInfo containerInfo = launcher.getContainerInfo();
+          LOG.info("Nothing to run in container, releasing it: {}", containerInfo.getContainer());
+          releaseAssignedContainer(containerInfo);
+        }
+      }
+    }
+
+    List<YarnContainerStatus> completed = allocateResponse.getCompletedStatus();
+    if (!completed.isEmpty()) {
+      handler.completed(completed);
+    }
+  }
+
+  @Override
+  public final ContainerRequestBuilder addContainerRequest(Resource capability) {
+    return addContainerRequest(capability, 1);
+  }
+
+  @Override
+  public final ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
+    return new ContainerRequestBuilder(adjustCapability(capability), count) {
+      @Override
+      public String apply() {
+        synchronized (AbstractYarnAMClient.this) {
+          String id = UUID.randomUUID().toString();
+
+          String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
+          String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
+
+          for (int i = 0; i < count; i++) {
+            T request = createContainerRequest(priority, capability, hosts, racks);
+            containerRequests.put(id, request);
+            requests.add(request);
+          }
+
+          return id;
+        }
+      }
+    };
+
+  }
+
+  @Override
+  public final synchronized void completeContainerRequest(String id) {
+    for (T request : containerRequests.removeAll(id)) {
+      removes.add(request);
+    }
+  }
+
+  /**
+   * Adjusts the given resource capability to fit in the cluster limit.
+   *
+   * @param capability The capability to be adjusted.
+   * @return A {@link Resource} instance representing the adjusted result.
+   */
+  protected abstract Resource adjustCapability(Resource capability);
+
+  /**
+   * Creates a container request based on the given requirement.
+   *
+   * @param priority The priority of the request.
+   * @param capability The resource capability.
+   * @param hosts Sets of hosts. Could be {@code null}.
+   * @param racks Sets of racks. Could be {@code null}.
+   * @return A container request.
+   */
+  protected abstract T createContainerRequest(Priority priority, Resource capability,
+                                              @Nullable String[] hosts, @Nullable String[] racks);
+
+  /**
+   * Adds the given request to prepare for next allocate call.
+   */
+  protected abstract void addContainerRequest(T request);
+
+  /**
+   * Removes the given request to prepare for the next allocate call.
+   */
+  protected abstract void removeContainerRequest(T request);
+
+  /**
+   * Performs actual allocate call to RM.
+   */
+  protected abstract AllocateResult doAllocate(float progress) throws Exception;
+
+  /**
+   * Releases the given container back to RM.
+   */
+  protected abstract void releaseAssignedContainer(YarnContainerInfo containerInfo);
+
+  /**
+   * Class for carrying results for the {@link #doAllocate(float)} call.
+   */
+  protected static final class AllocateResult {
+    private final List<RunnableProcessLauncher> launchers;
+    private final List<YarnContainerStatus> completedStatus;
+
+    public AllocateResult(List<RunnableProcessLauncher> launchers, List<YarnContainerStatus> completedStatus) {
+      this.launchers = ImmutableList.copyOf(launchers);
+      this.completedStatus = ImmutableList.copyOf(completedStatus);
+    }
+
+    public List<RunnableProcessLauncher> getLaunchers() {
+      return launchers;
+    }
+
+    public List<YarnContainerStatus> getCompletedStatus() {
+      return completedStatus;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
index 370ca3c..6a5ee36 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
@@ -19,7 +19,6 @@ package org.apache.twill.internal.yarn;
 
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Service;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -29,7 +28,6 @@ import org.apache.twill.internal.ProcessLauncher;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -95,7 +93,7 @@ public interface YarnAMClient extends Service {
    */
   // TODO: Move AM heartbeat logic into this interface so AM only needs to handle callback.
   interface AllocateHandler {
-    void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers);
+    void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launchers);
 
     void completed(List<YarnContainerStatus> completed);
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
new file mode 100644
index 0000000..42d332d
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.yarn;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.discovery.ServiceDiscovered;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test for requesting different container size in different order.
+ * It specifically test for workaround for YARN-314.
+ */
+public class ContainerSizeTestRun extends BaseYarnTest {
+
+  @Test
+  public void testContainerSize() throws InterruptedException {
+    TwillRunner runner = YarnTestUtils.getTwillRunner();
+    TwillController controller = runner.prepare(new SleepApp())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .start();
+
+    try {
+      ServiceDiscovered discovered = controller.discoverService("sleep");
+      Assert.assertTrue(YarnTestUtils.waitForSize(discovered, 2, 60));
+    } finally {
+      controller.stopAndWait();
+    }
+  }
+
+
+  /**
+   * An application that has two runnables with different memory size.
+   */
+  public static final class SleepApp implements TwillApplication {
+
+    @Override
+    public TwillSpecification configure() {
+      ResourceSpecification largeRes = ResourceSpecification.Builder.with()
+        .setVirtualCores(1)
+        .setMemory(1024, ResourceSpecification.SizeUnit.MEGA)
+        .build();
+
+      ResourceSpecification smallRes = ResourceSpecification.Builder.with()
+        .setVirtualCores(1)
+        .setMemory(512, ResourceSpecification.SizeUnit.MEGA)
+        .build();
+
+      return TwillSpecification.Builder.with()
+        .setName("SleepApp")
+        .withRunnable()
+          .add("sleep1", new SleepRunnable(12345), largeRes).noLocalFiles()
+          .add("sleep2", new SleepRunnable(12346), smallRes).noLocalFiles()
+        .withOrder()
+          .begin("sleep1")
+          .nextWhenStarted("sleep2")
+        .build();
+    }
+  }
+
+
+  /**
+   * A runnable that sleep for 30 seconds.
+   */
+  public static final class SleepRunnable extends AbstractTwillRunnable {
+
+    private volatile Thread runThread;
+
+    public SleepRunnable(int port) {
+      super(ImmutableMap.of("port", Integer.toString(port)));
+    }
+
+    @Override
+    public void run() {
+      runThread = Thread.currentThread();
+      Random random = new Random();
+      getContext().announce("sleep", Integer.parseInt(getContext().getSpecification().getConfigs().get("port")));
+      try {
+        TimeUnit.SECONDS.sleep(30);
+      } catch (InterruptedException e) {
+        // Ignore.
+      }
+    }
+
+    @Override
+    public void stop() {
+      if (runThread != null) {
+        runThread.interrupt();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
index 4d45ad1..f5ead6b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
@@ -23,12 +23,14 @@ import org.apache.twill.api.TwillRunner;
 import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.api.logging.LogThrowable;
+import org.apache.twill.api.logging.PrinterLogHandler;
 import org.apache.twill.common.Services;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.PrintWriter;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
@@ -62,6 +64,7 @@ public class LogHandlerTestRun extends BaseYarnTest {
 
     TwillRunner runner = YarnTestUtils.getTwillRunner();
     TwillController controller = runner.prepare(new LogRunnable())
+                                       .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
                                        .addLogHandler(logHandler)
                                        .start();