You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/10/08 02:04:34 UTC

[1/6] TWILL-87 : Adding Container Placement Policy control.

Repository: incubator-twill
Updated Branches:
  refs/heads/site 0004452d8 -> 128a16f35


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
index 9718150..cb6a58e 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import javax.annotation.Nullable;
 
@@ -54,6 +56,16 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
   private final List<T> requests;
   // List of requests pending to remove through allocate call
   private final List<T> removes;
+  //List of pending blacklist additions for the next allocate call
+  private final List<String> blacklistAdditions;
+  //List of pending blacklist removals for the next allocate call
+  private final List<String> blacklistRemovals;
+  //Keep track of blacklisted resources
+  private final List<String> blacklistedResources;
+  /**
+   * Contains a list of known unsupported features.
+   */
+  protected final Set<String> unsupportedFeatures = Sets.newHashSet();
 
   protected final ContainerId containerId;
   protected InetSocketAddress trackerAddr;
@@ -72,6 +84,9 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
     this.containerRequests = ArrayListMultimap.create();
     this.requests = Lists.newLinkedList();
     this.removes = Lists.newLinkedList();
+    this.blacklistAdditions = Lists.newArrayList();
+    this.blacklistRemovals = Lists.newArrayList();
+    this.blacklistedResources = Lists.newArrayList();
   }
 
 
@@ -106,6 +121,12 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
       removes.clear();
     }
 
+    if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
+      updateBlacklist(blacklistAdditions, blacklistRemovals);
+      blacklistAdditions.clear();
+      blacklistRemovals.clear();
+    }
+
     AllocateResult allocateResponse = doAllocate(progress);
     List<RunnableProcessLauncher> launchers = allocateResponse.getLaunchers();
 
@@ -147,7 +168,7 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
           String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
 
           for (int i = 0; i < count; i++) {
-            T request = createContainerRequest(priority, capability, hosts, racks);
+            T request = createContainerRequest(priority, capability, hosts, racks, relaxLocality);
             containerRequests.put(id, request);
             requests.add(request);
           }
@@ -160,6 +181,31 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
   }
 
   @Override
+  public final void addToBlacklist(String resource) {
+    if (!blacklistAdditions.contains(resource) && !blacklistedResources.contains(resource)) {
+      blacklistAdditions.add(resource);
+      blacklistedResources.add(resource);
+      blacklistRemovals.remove(resource);
+    }
+  }
+
+  @Override
+  public final void removeFromBlacklist(String resource) {
+    if (!blacklistRemovals.contains(resource) && blacklistedResources.contains(resource)) {
+      blacklistRemovals.add(resource);
+      blacklistedResources.remove(resource);
+      blacklistAdditions.remove(resource);
+    }
+  }
+
+  @Override
+  public final void clearBlacklist() {
+    blacklistRemovals.addAll(blacklistedResources);
+    blacklistedResources.clear();
+    blacklistAdditions.clear();
+  }
+
+  @Override
   public final synchronized void completeContainerRequest(String id) {
     for (T request : containerRequests.removeAll(id)) {
       removes.add(request);
@@ -167,6 +213,19 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
   }
 
   /**
+   * Records an unsupported feature.
+   * @param unsupportedFeature A string identifying an unsupported feature.
+   * @return Returns {@code false} if the feature has already been recorded, {@code true} otherwise.
+   */
+  protected boolean recordUnsupportedFeature(String unsupportedFeature) {
+    if (unsupportedFeatures.contains(unsupportedFeature)) {
+      return false;
+    }
+    unsupportedFeatures.add(unsupportedFeature);
+    return true;
+  }
+
+  /**
    * Adjusts the given resource capability to fit in the cluster limit.
    *
    * @param capability The capability to be adjusted.
@@ -181,10 +240,12 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
    * @param capability The resource capability.
    * @param hosts Sets of hosts. Could be {@code null}.
    * @param racks Sets of racks. Could be {@code null}.
+   * @param relaxLocality If set {@code false}, locality constraints will not be relaxed.
    * @return A container request.
    */
   protected abstract T createContainerRequest(Priority priority, Resource capability,
-                                              @Nullable String[] hosts, @Nullable String[] racks);
+                                              @Nullable String[] hosts, @Nullable String[] racks,
+                                              boolean relaxLocality);
 
   /**
    * Adds the given request to prepare for next allocate call.
@@ -197,6 +258,11 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
   protected abstract void removeContainerRequest(T request);
 
   /**
+   * Send blacklist updates in the next allocate call.
+   */
+  protected abstract void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals);
+
+  /**
    * Performs actual allocate call to RM.
    */
   protected abstract AllocateResult doAllocate(float progress) throws Exception;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
index 6f47b6c..d932e77 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
@@ -36,14 +36,23 @@ public final class VersionDetectYarnAMClientFactory implements YarnAMClientFacto
   public YarnAMClient create() {
     try {
       Class<YarnAMClient> clz;
-      if (YarnUtils.isHadoop20()) {
-        // Uses hadoop-2.0 class
-        String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAMClient";
-        clz = (Class<YarnAMClient>) Class.forName(clzName);
-      } else {
-        // Uses hadoop-2.1 class
-        String clzName = getClass().getPackage().getName() + ".Hadoop21YarnAMClient";
-        clz = (Class<YarnAMClient>) Class.forName(clzName);
+      String clzName;
+      switch (YarnUtils.getHadoopVersion()) {
+        case HADOOP_20:
+          // Uses hadoop-2.0 class
+          clzName = getClass().getPackage().getName() + ".Hadoop20YarnAMClient";
+          clz = (Class<YarnAMClient>) Class.forName(clzName);
+          break;
+        case HADOOP_21:
+          // Uses hadoop-2.1 class
+          clzName = getClass().getPackage().getName() + ".Hadoop21YarnAMClient";
+          clz = (Class<YarnAMClient>) Class.forName(clzName);
+          break;
+        default:
+          // Uses hadoop-2.2 class
+          clzName = getClass().getPackage().getName() + ".Hadoop22YarnAMClient";
+          clz = (Class<YarnAMClient>) Class.forName(clzName);
+          break;
       }
 
       return clz.getConstructor(Configuration.class).newInstance(conf);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
index f9db959..04bd30d 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
@@ -31,7 +31,7 @@ public final class VersionDetectYarnAppClientFactory implements YarnAppClientFac
     try {
       Class<YarnAppClient> clz;
 
-      if (YarnUtils.isHadoop20()) {
+      if (YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_20)) {
         // Uses hadoop-2.0 class.
         String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAppClient";
         clz = (Class<YarnAppClient>) Class.forName(clzName);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
index a5a061a..a10181e 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
@@ -47,10 +47,12 @@ public interface YarnAMClient extends Service {
     protected final Set<String> hosts = Sets.newHashSet();
     protected final Set<String> racks = Sets.newHashSet();
     protected final Priority priority = Records.newRecord(Priority.class);
+    protected boolean relaxLocality;
 
     protected ContainerRequestBuilder(Resource capability, int count) {
       this.capability = capability;
       this.count = count;
+      this.relaxLocality = true;
     }
 
     public ContainerRequestBuilder addHosts(Collection<String> newHosts) {
@@ -66,6 +68,11 @@ public interface YarnAMClient extends Service {
       return this;
     }
 
+    public ContainerRequestBuilder setRelaxLocality(boolean relaxLocality) {
+      this.relaxLocality = relaxLocality;
+      return this;
+    }
+
     /**
      * Adds a container request. Returns an unique ID for the request.
      */
@@ -83,6 +90,8 @@ public interface YarnAMClient extends Service {
 
   String getHost();
 
+  int getNMPort();
+
   /**
    * Sets the tracker address and tracker url. This method should be called before calling {@link #start()}.
    */
@@ -104,6 +113,12 @@ public interface YarnAMClient extends Service {
 
   ContainerRequestBuilder addContainerRequest(Resource capability, int count);
 
+  void addToBlacklist(String resource);
+
+  void removeFromBlacklist(String resource);
+
+  void clearBlacklist();
+
   /**
    * Notify a container request is fulfilled.
    *

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
index 97236f6..3b03a2a 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
@@ -19,10 +19,13 @@ package org.apache.twill.internal.yarn;
 
 import com.google.common.util.concurrent.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
 
+import java.util.List;
+
 /**
  * Interface for launching Yarn application from client.
  */
@@ -42,4 +45,11 @@ public interface YarnAppClient extends Service {
   ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception;
 
   ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId);
+
+  /**
+   * Returns a list of {@link org.apache.hadoop.yarn.api.records.NodeReport} for the nodes in the cluster.
+   * @return a list of {@link org.apache.hadoop.yarn.api.records.NodeReport} for the nodes in the cluster.
+   * @throws Exception Propagates exceptions thrown by {@link org.apache.hadoop.yarn.client.api.YarnClient}.
+   */
+  List<NodeReport> getNodeReports() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
index be30f33..1c65591 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -21,6 +21,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -57,8 +58,17 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class YarnUtils {
 
+  /**
+   * Defines different versions of Hadoop.
+   */
+  public enum HadoopVersions {
+    HADOOP_20,
+    HADOOP_21,
+    HADOOP_22
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
-  private static final AtomicReference<Boolean> HADOOP_20 = new AtomicReference<Boolean>();
+  private static final AtomicReference<HadoopVersions> HADOOP_VERSION = new AtomicReference<HadoopVersions>();
 
   public static YarnLocalResource createLocalResource(LocalFile localFile) {
     Preconditions.checkArgument(localFile.getLastModified() >= 0, "Last modified time should be >= 0.");
@@ -202,21 +212,27 @@ public class YarnUtils {
   }
 
   /**
-   * Returns true if Hadoop-2.0 classes are in the classpath.
+   * Returns {@link org.apache.twill.internal.yarn.YarnUtils.HadoopVersions} for the current build profile,
+   * depending on the classes in the classpath.
+   * @return The version of Hadoop for the current build profile.
    */
-  public static boolean isHadoop20() {
-    Boolean hadoop20 = HADOOP_20.get();
-    if (hadoop20 != null) {
-      return hadoop20;
+  public static HadoopVersions getHadoopVersion() {
+    HadoopVersions hadoopVersion = HADOOP_VERSION.get();
+    if (hadoopVersion != null) {
+      return hadoopVersion;
     }
     try {
       Class.forName("org.apache.hadoop.yarn.client.api.NMClient");
-      HADOOP_20.set(false);
-      return false;
+      try {
+        Class.forName("org.apache.hadoop.yarn.client.cli.LogsCLI");
+        HADOOP_VERSION.set(HadoopVersions.HADOOP_22);
+      } catch (ClassNotFoundException e) {
+        HADOOP_VERSION.set(HadoopVersions.HADOOP_21);
+      }
     } catch (ClassNotFoundException e) {
-      HADOOP_20.set(true);
-      return true;
+      HADOOP_VERSION.set(HadoopVersions.HADOOP_20);
     }
+    return HADOOP_VERSION.get();
   }
 
   /**
@@ -225,7 +241,7 @@ public class YarnUtils {
   private static <T> T createAdapter(Class<T> clz) {
     String className = clz.getPackage().getName();
 
-    if (isHadoop20()) {
+    if (getHadoopVersion().equals(HadoopVersions.HADOOP_20)) {
       className += ".Hadoop20" + clz.getSimpleName();
     } else {
       className += ".Hadoop21" + clz.getSimpleName();

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 36b4f0b..0e0fc75 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -455,8 +455,8 @@ final class YarnTwillPreparer implements TwillPreparer {
       }
 
       TwillSpecificationAdapter.create().toJson(
-        new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(), eventHandler),
-        writer);
+        new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(), spec.getPlacementPolicies(),
+                                      eventHandler), writer);
     } finally {
       writer.close();
     }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
new file mode 100644
index 0000000..8981b66
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.twill.api.Hosts;
+import org.apache.twill.api.Racks;
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.discovery.ServiceDiscovered;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for placement Policies.
+ */
+public class PlacementPolicyTestRun extends BaseYarnTest {
+  private static final int RUNNABLE_MEMORY = 512;
+  private static final int RUNNABLE_CORES = 1;
+
+  private static List<NodeReport> nodeReports;
+  private static ResourceSpecification resource;
+  private static ResourceSpecification twoInstancesResource;
+
+  /**
+   * Verify the cluster configuration (number and capability of node managers) required for the tests.
+   */
+  @BeforeClass
+  public static void verifyClusterCapability() {
+    // Ignore verifications if it is running against older Hadoop versions which does not support blacklists.
+    Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
+
+    // All runnables in this test class use same resource specification for the sake of convenience.
+    resource = ResourceSpecification.Builder.with()
+      .setVirtualCores(RUNNABLE_CORES)
+      .setMemory(RUNNABLE_MEMORY, ResourceSpecification.SizeUnit.MEGA)
+      .build();
+    twoInstancesResource = ResourceSpecification.Builder.with()
+      .setVirtualCores(RUNNABLE_CORES)
+      .setMemory(RUNNABLE_MEMORY, ResourceSpecification.SizeUnit.MEGA)
+      .setInstances(2)
+      .build();
+
+    try {
+      nodeReports = YarnTestUtils.getNodeReports();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    // The tests need exactly three NodeManagers in the cluster.
+    Assert.assertNotNull(nodeReports);
+    Assert.assertEquals(nodeReports.size(), 3);
+
+    // All NodeManagers should have enough capacity available to accommodate at least two runnables.
+    for (NodeReport nodeReport : nodeReports) {
+      Resource capability = nodeReport.getCapability();
+      Resource used = nodeReport.getUsed();
+      Assert.assertNotNull(capability);
+      if (used != null) {
+        Assert.assertTrue(2 * resource.getMemorySize() < capability.getMemory() - used.getMemory());
+      } else {
+        Assert.assertTrue(2 * resource.getMemorySize() < capability.getMemory());
+      }
+    }
+  }
+
+  /**
+   * Test to verify placement policy without dynamically changing number of instances.
+   */
+  @Test
+  public void testPlacementPolicy() throws Exception {
+    // Ignore test if it is running against older Hadoop versions which does not support blacklists.
+    Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
+
+    ServiceDiscovered serviceDiscovered;
+    ResourceReport resourceReport;
+    Set<Integer> nmPorts = Sets.newHashSet();
+    Collection<TwillRunResources> distributedResource;
+
+    Assert.assertEquals(getProvisionedNodeManagerCount(), 0);
+    TwillRunner runner = YarnTestUtils.getTwillRunner();
+    TwillController controller = runner.prepare(new PlacementPolicyApplication())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .withApplicationArguments("PlacementPolicyTest")
+      .withArguments("hostRunnable", "host")
+      .withArguments("hostRackRunnable", "hostRack")
+      .withArguments("distributedRunnable", "distributed")
+      .start();
+
+    try {
+      // All runnables should get started.
+      serviceDiscovered = controller.discoverService("PlacementPolicyTest");
+      Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 4, 80));
+
+      // DISTRIBUTED runnables should be provisioned on different nodes.
+      Assert.assertTrue(getProvisionedNodeManagerCount() >= 2);
+    } finally {
+      controller.stopAndWait();
+    }
+
+    // Sleep a bit before exiting.
+    TimeUnit.SECONDS.sleep(2);
+  }
+
+  /**
+   * An application that specify runnables with different placement policies.
+   */
+  public static final class PlacementPolicyApplication implements TwillApplication {
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("PlacementPolicyApplication")
+        .withRunnable()
+          .add("hostRunnable", new EchoServer(), resource).noLocalFiles()
+          .add("hostRackRunnable", new EchoServer(), resource).noLocalFiles()
+          .add("distributedRunnable", new EchoServer(), twoInstancesResource).noLocalFiles()
+        .withPlacementPolicy()
+          .add(Hosts.of(nodeReports.get(0).getHttpAddress()), "hostRunnable")
+          .add(Hosts.of(nodeReports.get(1).getHttpAddress()), Racks.of("/default-rack"), "hostRackRunnable")
+          .add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "distributedRunnable")
+        .anyOrder()
+        .build();
+    }
+  }
+
+  /**
+   * Test to verify DISTRIBUTED placement policies are taken care of when number of instances are changed.
+   * Also, verifies that DISTRIBUTED placement policies do not affect other runnables.
+   */
+  @Test
+  public void testDistributedPlacementPolicy() throws Exception {
+    // Ignore test if it is running against older Hadoop versions which does not support blacklists.
+    Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
+
+    ServiceDiscovered serviceDiscovered;
+    ResourceReport resourceReport;
+    Set<Integer> nmPorts = Sets.newHashSet();
+    Collection<TwillRunResources> aliceResources;
+    Collection<TwillRunResources> bobResources;
+
+    Assert.assertEquals(getProvisionedNodeManagerCount(), 0);
+    TwillRunner runner = YarnTestUtils.getTwillRunner();
+    TwillController controller = runner.prepare(new DistributedApplication())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .withApplicationArguments("DistributedTest")
+      .withArguments("Alice", "alice")
+      .withArguments("Bob", "bob")
+      .withArguments("Eve", "eve")
+      .start();
+
+    try {
+      // All runnables should get started with DISTRIBUTED ones being on different nodes.
+      serviceDiscovered = controller.discoverService("DistributedTest");
+      Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 3, 60));
+      Assert.assertTrue(getProvisionedNodeManagerCount() >= 2);
+
+      // Spawning a new instance for DISTRIBUTED runnable Alice, which should get a different node.
+      controller.changeInstances("Alice", 2);
+      serviceDiscovered = controller.discoverService("DistributedTest");
+      Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 4, 60));
+      Assert.assertTrue(getProvisionedNodeManagerCount() >= 3);
+
+      // Spawning a new instance for DEFAULT runnable Eve,
+      // which should not be affected by placement policies of previous runnables.
+      controller.changeInstances("Eve", 2);
+      serviceDiscovered = controller.discoverService("DistributedTest");
+      Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 5, 60));
+
+      // Spawning a new instance for DISTRIBUTED runnable Bob,
+      // which will be forced to give up it's placement policy restrictions, since there are only three nodes.
+      controller.changeInstances("Bob", 2);
+      serviceDiscovered = controller.discoverService("DistributedTest");
+      Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 6, 60));
+      Assert.assertTrue(getProvisionedNodeManagerCount() >= 3);
+    } finally {
+      controller.stopAndWait();
+    }
+
+    // Sleep a bit before exiting.
+    TimeUnit.SECONDS.sleep(2);
+  }
+
+  /**
+   * An application that runs three runnables, with a DISTRIBUTED placement policy for two of them.
+   */
+  public static final class DistributedApplication implements TwillApplication {
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("DistributedApplication")
+        .withRunnable()
+          .add("Alice", new EchoServer(), resource).noLocalFiles()
+          .add("Bob", new EchoServer(), resource).noLocalFiles()
+          .add("Eve", new EchoServer(), resource).noLocalFiles()
+        .withPlacementPolicy()
+          .add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "Alice", "Bob")
+        .anyOrder()
+        .build();
+    }
+  }
+
+  /**
+   * Test to verify changing instances during application run works for DISTRIBUTED runnables.
+   */
+  @Test
+  public void testChangeInstance() throws InterruptedException {
+    // Ignore test if it is running against older Hadoop versions which does not support blacklists.
+    Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
+
+    ServiceDiscovered serviceDiscovered;
+    ResourceReport resourceReport;
+    Set<Integer> nmPorts = Sets.newHashSet();
+    Collection<TwillRunResources> aliceResources;
+    Collection<TwillRunResources> bobResources;
+
+    TwillRunner runner = YarnTestUtils.getTwillRunner();
+    TwillController controller = runner.prepare(new ChangeInstanceApplication())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .withApplicationArguments("DistributedTest")
+      .withArguments("Alice", "alice")
+      .withArguments("Bob", "bob")
+      .withArguments("Eve", "eve")
+      .start();
+
+    try {
+      // All runnables should get started.
+      serviceDiscovered = controller.discoverService("DistributedTest");
+      Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 4, 60));
+
+      // Increasing the instance count for runnable Alice by 2.
+      controller.changeInstances("Alice", 4);
+      serviceDiscovered = controller.discoverService("DistributedTest");
+      Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 6, 60));
+
+      // Decreasing instance count for runnable Alice by 3.
+      controller.changeInstances("Alice", 1);
+      serviceDiscovered = controller.discoverService("DistributedTest");
+      Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 3, 60));
+
+      // Increasing instance count for runnable Bob by 2.
+      controller.changeInstances("Bob", 3);
+      serviceDiscovered = controller.discoverService("DistributedTest");
+      Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 5, 60));
+
+      // Increasing instance count for runnable Eve by 2.
+      controller.changeInstances("Eve", 3);
+      serviceDiscovered = controller.discoverService("DistributedTest");
+      Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 7, 60));
+    } finally {
+      controller.stopAndWait();
+    }
+
+    // Sleep a bit before exiting.
+    TimeUnit.SECONDS.sleep(2);
+  }
+
+  /**
+   * An application that runs three runnables, with a DISTRIBUTED placement policy for two of them.
+   */
+  public static final class ChangeInstanceApplication implements TwillApplication {
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("DistributedApplication")
+        .withRunnable()
+        .add("Alice", new EchoServer(), twoInstancesResource).noLocalFiles()
+        .add("Bob", new EchoServer(), resource).noLocalFiles()
+        .add("Eve", new EchoServer(), resource).noLocalFiles()
+        .withPlacementPolicy()
+        .add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "Alice", "Bob")
+        .anyOrder()
+        .build();
+    }
+  }
+
+  /**
+   * Test to verify exception is thrown in case a non-existent runnable is specified in a placement policy.
+   */
+  @Test(expected = IllegalArgumentException.class)
+  public void testNonExistentRunnable() {
+    TwillRunner runner = YarnTestUtils.getTwillRunner();
+    TwillController controller = runner.prepare(new FaultyApplication())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .start();
+      controller.stopAndWait();
+  }
+
+  /**
+   * An application that uses non-existent runnable name while specifying placement policies.
+   */
+  public static final class FaultyApplication implements TwillApplication {
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("FaultyApplication")
+        .withRunnable()
+          .add("Hermione", new EchoServer(), resource).noLocalFiles()
+          .add("Harry", new EchoServer(), resource).noLocalFiles()
+          .add("Ron", new EchoServer(), resource).noLocalFiles()
+        .withPlacementPolicy()
+          .add(TwillSpecification.PlacementPolicy.Type.DEFAULT, "Hermione", "Ron")
+          .add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "Draco", "Harry")
+          .anyOrder()
+        .build();
+    }
+  }
+
+  /**
+   * Test to verify exception is thrown in case a runnable is mentioned in more than one placement policy.
+   */
+  @Test(expected = IllegalArgumentException.class)
+  public void testPlacementPolicySpecification() {
+    TwillRunner runner = YarnTestUtils.getTwillRunner();
+    TwillController controller = runner.prepare(new BadApplication())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .start();
+      controller.stopAndWait();
+  }
+
+  /**
+   * An application that specifies a runnable name in more than one placement policy.
+   */
+  public static final class BadApplication implements TwillApplication {
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("BadApplication")
+        .withRunnable()
+          .add("Hermione", new EchoServer(), resource).noLocalFiles()
+          .add("Harry", new EchoServer(), resource).noLocalFiles()
+          .add("Ron", new EchoServer(), resource).noLocalFiles()
+        .withPlacementPolicy()
+          .add(TwillSpecification.PlacementPolicy.Type.DEFAULT, "Hermione", "Harry")
+          .add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "Hermione", "Ron")
+        .anyOrder()
+        .build();
+    }
+  }
+
+  /**
+   * Helper function to verify DISTRIBUTED placement policies.
+   * Returns the number of NodeManagers on which runnables got provisioned.
+   * @return number of NodeManagers on which runnables got provisioned.
+   */
+  private static int getProvisionedNodeManagerCount() throws Exception {
+    int provisionedNodeManagerCount = 0;
+    for (NodeReport nodeReport : YarnTestUtils.getNodeReports()) {
+      Resource used = nodeReport.getUsed();
+      if (used != null && used.getMemory() > 0) {
+          provisionedNodeManagerCount++;
+      }
+    }
+    return provisionedNodeManagerCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index b8a3915..87c380f 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -35,7 +35,8 @@ import org.junit.runners.Suite;
                       LogHandlerTestRun.class,
                       SessionExpireTestRun.class,
                       ServiceDiscoveryTestRun.class,
-                      DebugTestRun.class
+                      DebugTestRun.class,
+                      PlacementPolicyTestRun.class
                     })
 public final class YarnTestSuite {
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
index bdf97e6..310d097 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
@@ -21,10 +21,13 @@ import com.google.common.collect.Iterables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.twill.api.TwillRunner;
 import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
+import org.apache.twill.internal.yarn.YarnAppClient;
 import org.apache.twill.internal.yarn.YarnUtils;
 import org.apache.twill.internal.zookeeper.InMemoryZKServer;
 import org.junit.rules.TemporaryFolder;
@@ -33,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -47,6 +51,7 @@ public final class YarnTestUtils {
   private static MiniYARNCluster cluster;
   private static TwillRunnerService runnerService;
   private static YarnConfiguration config;
+  private static YarnAppClient yarnAppClient;
 
   private static final AtomicBoolean once = new AtomicBoolean(false);
 
@@ -85,7 +90,7 @@ public final class YarnTestUtils {
 
     Configuration conf = new YarnConfiguration(dfsCluster.getFileSystem().getConf());
 
-    if (YarnUtils.isHadoop20()) {
+    if (YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_20)) {
       conf.set("yarn.resourcemanager.scheduler.class",
                  "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler");
     } else {
@@ -93,13 +98,14 @@ public final class YarnTestUtils {
                  "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
       conf.set("yarn.scheduler.capacity.resource-calculator",
                  "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
+      conf.setBoolean("yarn.scheduler.include-port-in-node-name", true);
     }
     conf.set("yarn.nodemanager.vmem-pmem-ratio", "20.1");
     conf.set("yarn.nodemanager.vmem-check-enabled", "false");
     conf.set("yarn.scheduler.minimum-allocation-mb", "128");
     conf.set("yarn.nodemanager.delete.debug-delay-sec", "3600");
 
-    cluster = new MiniYARNCluster("test-cluster", 1, 1, 1);
+    cluster = new MiniYARNCluster("test-cluster", 3, 1, 1);
     cluster.init(conf);
     cluster.start();
 
@@ -107,6 +113,9 @@ public final class YarnTestUtils {
 
     runnerService = createTwillRunnerService();
     runnerService.startAndWait();
+
+    yarnAppClient = new VersionDetectYarnAppClientFactory().create(conf);
+    yarnAppClient.start();
   }
 
   public static final boolean finish() {
@@ -115,6 +124,7 @@ public final class YarnTestUtils {
       cluster.stop();
       dfsCluster.shutdown();
       zkServer.stopAndWait();
+      yarnAppClient.stop();
 
       return true;
     }
@@ -140,6 +150,15 @@ public final class YarnTestUtils {
     return runner;
   }
 
+  /**
+   * Returns {@link org.apache.hadoop.yarn.api.records.NodeReport} for the nodes in the MiniYarnCluster.
+   * @return a list of {@link org.apache.hadoop.yarn.api.records.NodeReport} for the nodes in the cluster.
+   * @throws Exception Propagates exceptions thrown by {@link org.apache.hadoop.yarn.client.api.YarnClient}.
+   */
+  public static final List<NodeReport> getNodeReports() throws Exception {
+    return yarnAppClient.getNodeReports();
+  }
+
   public static final <T> boolean waitForSize(Iterable<T> iterable, int count, int limit) throws InterruptedException {
     int trial = 0;
     int size = Iterables.size(iterable);


[2/6] git commit: TWILL-87 : Adding Container Placement Policy control.

Posted by ch...@apache.org.
TWILL-87 : Adding Container Placement Policy control.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/8cf4cd02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/8cf4cd02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/8cf4cd02

Branch: refs/heads/site
Commit: 8cf4cd02c922595c703eff4076ca74633421b301
Parents: 34af22b
Author: Gourav Khaneja <go...@gmail.com>
Authored: Sat Aug 2 00:25:02 2014 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Sat Aug 2 23:04:56 2014 -0700

----------------------------------------------------------------------
 pom.xml                                         |   2 +
 .../main/java/org/apache/twill/api/Hosts.java   |  65 +++
 .../main/java/org/apache/twill/api/Racks.java   |  65 +++
 .../apache/twill/api/ResourceSpecification.java |  58 +--
 .../apache/twill/api/TwillSpecification.java    | 175 ++++++++-
 .../internal/DefaultResourceSpecification.java  |  25 --
 .../internal/DefaultTwillSpecification.java     |  86 +++-
 .../org/apache/twill/internal/Constants.java    |   6 +
 .../json/ResourceSpecificationCodec.java        |   6 +-
 .../json/TwillSpecificationAdapter.java         |   3 +
 .../internal/json/TwillSpecificationCodec.java  |  36 +-
 .../json/ResourceSpecificationCodecTest.java    |  19 +-
 .../internal/yarn/Hadoop20YarnAMClient.java     |  17 +-
 .../internal/yarn/Hadoop20YarnAppClient.java    |   7 +
 .../internal/yarn/Hadoop21YarnAMClient.java     |  28 +-
 .../internal/yarn/Hadoop21YarnAppClient.java    |   8 +
 .../internal/yarn/Hadoop22YarnAMClient.java     |  42 ++
 .../appmaster/AllocationSpecification.java      | 105 +++++
 .../appmaster/ApplicationMasterService.java     | 169 +++++++-
 .../appmaster/PlacementPolicyManager.java       | 104 +++++
 .../internal/appmaster/ProvisionRequest.java    |  11 +
 .../appmaster/RunnableContainerRequest.java     |  11 +-
 .../appmaster/RunnableProcessLauncher.java      |   5 +-
 .../internal/appmaster/RunningContainers.java   |  54 +++
 .../internal/yarn/AbstractYarnAMClient.java     |  70 +++-
 .../yarn/VersionDetectYarnAMClientFactory.java  |  25 +-
 .../yarn/VersionDetectYarnAppClientFactory.java |   2 +-
 .../twill/internal/yarn/YarnAMClient.java       |  15 +
 .../twill/internal/yarn/YarnAppClient.java      |  10 +
 .../apache/twill/internal/yarn/YarnUtils.java   |  38 +-
 .../apache/twill/yarn/YarnTwillPreparer.java    |   4 +-
 .../twill/yarn/PlacementPolicyTestRun.java      | 391 +++++++++++++++++++
 .../org/apache/twill/yarn/YarnTestSuite.java    |   3 +-
 .../org/apache/twill/yarn/YarnTestUtils.java    |  23 +-
 34 files changed, 1522 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dfb7394..67ab4fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -508,6 +508,7 @@
                                 <configuration>
                                     <sources>
                                         <source>src/main/hadoop21</source>
+                                        <source>src/main/hadoop22</source>
                                     </sources>
                                 </configuration>
                             </execution>
@@ -552,6 +553,7 @@
                                 <configuration>
                                     <sources>
                                         <source>src/main/hadoop21</source>
+                                        <source>src/main/hadoop22</source>
                                     </sources>
                                 </configuration>
                             </execution>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/Hosts.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Hosts.java b/twill-api/src/main/java/org/apache/twill/api/Hosts.java
new file mode 100644
index 0000000..90a27ad
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/Hosts.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.api;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Represents a list of hosts.
+ */
+
+public class Hosts {
+  private final Set<String> hosts;
+
+  public Hosts(Set<String> hosts) {
+    this.hosts = ImmutableSet.copyOf(hosts);
+  }
+
+  public Hosts(String host, String...moreHosts) {
+    this.hosts = ImmutableSet.<String>builder()
+      .add(host)
+      .addAll(Arrays.asList(moreHosts))
+      .build();
+  }
+
+  /**
+   * Convenience method to create an instance of {@link org.apache.twill.api.Hosts}.
+   * @param host A host to be added.
+   * @param moreHosts A list of hosts to be added.
+   * @return An instance of {@link org.apache.twill.api.Hosts} containing specified hosts.
+   */
+  public static Hosts of(String host, String...moreHosts) {
+    return new Hosts(host, moreHosts);
+  }
+
+  /**
+   * Get the list of hosts.
+   * @return list of hosts.
+   */
+  public Set<String> get() {
+    return this.hosts;
+  }
+
+  @Override
+  public String toString() {
+    return this.hosts.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/Racks.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Racks.java b/twill-api/src/main/java/org/apache/twill/api/Racks.java
new file mode 100644
index 0000000..04eedd0
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/Racks.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.api;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Represents a list of Racks.
+ */
+
+public class Racks {
+  private final Set<String> racks;
+
+  public Racks(Set<String> racks) {
+    this.racks = ImmutableSet.copyOf(racks);
+  }
+
+  public Racks(String rack, String...moreRacks) {
+    this.racks = ImmutableSet.<String>builder()
+      .add(rack)
+      .addAll(Arrays.asList(moreRacks))
+      .build();
+  }
+
+  /**
+   * Convenience method to create an instance of {@link org.apache.twill.api.Racks}.
+   * @param rack A rack to be added.
+   * @param moreRacks A list of racks to be added.
+   * @return An instance of {@link org.apache.twill.api.Racks} containing specified racks.
+   */
+  public static Racks of(String rack, String...moreRacks) {
+    return new Racks(rack, moreRacks);
+  }
+
+  /**
+   * Get the list of racks.
+   * @return list of racks.
+   */
+  public Set<String> get() {
+    return this.racks;
+  }
+
+  @Override
+  public String toString() {
+    return this.racks.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java b/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
index 4b602bc..2865caf 100644
--- a/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
@@ -17,14 +17,8 @@
  */
 package org.apache.twill.api;
 
-import com.google.common.collect.Iterables;
 import org.apache.twill.internal.DefaultResourceSpecification;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
 /**
  * This interface provides specifications for resource requirements including set and get methods
  * for number of cores, amount of memory, and number of instances.
@@ -85,20 +79,6 @@ public interface ResourceSpecification {
   int getInstances();
 
   /**
-   * Returns the execution hosts, expects Fully Qualified Domain Names host + domain.
-   * This is a suggestion for the scheduler depending on cluster load it may ignore it
-   * @return An array containing the hosts where the containers should run
-   */
-  List<String> getHosts();
-
-  /**
-   * Returns the execution racks.
-   * This is a suggestion for the scheduler depending on cluster load it may ignore it
-   * @return An array containing the racks where the containers should run
-   */
-  List<String> getRacks();
-
-  /**
    * Builder for creating {@link ResourceSpecification}.
    */
   static final class Builder {
@@ -108,8 +88,6 @@ public interface ResourceSpecification {
     private int uplink = -1;
     private int downlink = -1;
     private int instances = 1;
-    private List<String> hosts = new LinkedList<String>();
-    private List<String> racks = new LinkedList<String>();
 
     public static CoreSetter with() {
       return new Builder().new CoreSetter();
@@ -150,42 +128,10 @@ public interface ResourceSpecification {
     }
 
     public final class AfterUplink extends Build {
-      public AfterDownlink setDownlink(int downlink, SizeUnit unit) {
+      public Done setDownlink(int downlink, SizeUnit unit) {
         Builder.this.downlink = downlink * unit.multiplier;
-        return new AfterDownlink();
-      }
-    }
-
-    public final class AfterHosts extends Build {
-      public Done setRacks(String... racks) {
-        if (racks != null) {
-          Builder.this.racks = Arrays.asList(racks);
-        }
         return new Done();
       }
-
-      public Done setRacks(Iterable<String> racks) {
-        if (racks != null) {
-          Iterables.addAll(Builder.this.racks, racks);
-        }
-        return new Done();
-      }
-    }
-
-    public final class AfterDownlink extends Build {
-      public AfterHosts setHosts(String... hosts) {
-        if (hosts != null) {
-          Builder.this.hosts = Arrays.asList(hosts);
-        }
-        return new AfterHosts();
-      }
-
-      public AfterHosts setHosts(Iterable<String> hosts) {
-        if (hosts != null) {
-          Iterables.addAll(Builder.this.hosts, hosts);
-        }
-        return new AfterHosts();
-      }
     }
 
     public final class Done extends Build {
@@ -193,7 +139,7 @@ public interface ResourceSpecification {
 
     public abstract class Build {
       public ResourceSpecification build() {
-        return new DefaultResourceSpecification(cores, memory, instances, uplink, downlink, hosts, racks);
+        return new DefaultResourceSpecification(cores, memory, instances, uplink, downlink);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java b/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
index 3931d04..ed37190 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
@@ -60,6 +60,46 @@ public interface TwillSpecification {
   }
 
   /**
+   * Defines a container placement policy.
+   */
+  interface PlacementPolicy {
+
+    /**
+     * Lists different types of Placement Policies available.
+     */
+    enum Type {
+      /**
+       * Runnables should be scattered over different hosts.
+       */
+      DISTRIBUTED,
+      /**
+       * No specific placement policy.
+       */
+      DEFAULT
+    }
+
+    /**
+     * @return Set of {@link org.apache.twill.api.TwillRunnable} names that belongs to this placement policy.
+     */
+    Set<String> getNames();
+
+    /**
+     * @return {@link org.apache.twill.api.TwillSpecification.PlacementPolicy.Type Type} of this placement policy.
+     */
+    Type getType();
+
+    /**
+     * @return Set of hosts associated with this placement policy.
+     */
+    Set<String> getHosts();
+
+    /**
+     * @return Set of racks associated with this placement policy.
+     */
+    Set<String> getRacks();
+  }
+
+  /**
    * @return Name of the application.
    */
   String getName();
@@ -75,6 +115,11 @@ public interface TwillSpecification {
   List<Order> getOrders();
 
   /**
+   * @return Returns all {@link org.apache.twill.api.TwillSpecification.PlacementPolicy} for this application.
+   */
+  List<PlacementPolicy> getPlacementPolicies();
+
+  /**
    * @return The {@link EventHandlerSpecification} for the {@link EventHandler} to be used for this application,
    *         or {@code null} if no event handler has been provided.
    */
@@ -89,6 +134,7 @@ public interface TwillSpecification {
     private String name;
     private Map<String, RuntimeSpecification> runnables = Maps.newHashMap();
     private List<Order> orders = Lists.newArrayList();
+    private List<PlacementPolicy> placementPolicies = Lists.newArrayList();
     private EventHandlerSpecification eventHandler;
 
     public static NameSetter with() {
@@ -128,6 +174,8 @@ public interface TwillSpecification {
       FirstOrder withOrder();
 
       AfterOrder anyOrder();
+
+      PlacementPolicySetter withPlacementPolicy();
     }
 
     public final class RunnableSetter implements MoreRunnable, AfterRunnable {
@@ -170,6 +218,11 @@ public interface TwillSpecification {
       public AfterOrder anyOrder() {
         return new OrderSetter();
       }
+
+      @Override
+      public PlacementPolicySetter withPlacementPolicy() {
+        return new PlacementPolicySetter();
+      }
     }
 
     /**
@@ -252,6 +305,125 @@ public interface TwillSpecification {
       }
     }
 
+    /**
+     * Interface to add placement policies to the application.
+     */
+    public interface MorePlacementPolicies {
+
+      /**
+       * Specify hosts for a list of runnables.
+       * @param hosts {@link org.apache.twill.api.Hosts} specifying a set of hosts.
+       * @param runnableName a runnable name.
+       * @param runnableNames a list of runnable names.
+       * @return A reference to either add more placement policies or skip to defining execution order.
+       */
+      PlacementPolicySetter add(Hosts hosts, String runnableName, String... runnableNames);
+
+      /**
+       * Specify racks for a list of runnables.
+       * @param racks {@link org.apache.twill.api.Racks} specifying a set of racks.
+       * @param runnableName a runnable name.
+       * @param runnableNames a list of runnable names.
+       * @return A reference to either add more placement policies or skip to defining execution order.
+       */
+      PlacementPolicySetter add(Racks racks, String runnableName, String... runnableNames);
+
+      /**
+       * Specify hosts and racks for a list of runnables.
+       * @param hosts {@link org.apache.twill.api.Hosts} specifying a set of hosts.
+       * @param racks {@link org.apache.twill.api.Racks} specifying a set of racks.
+       * @param runnableName a runnable name.
+       * @param runnableNames a list of runnable names.
+       * @return A reference to either add more placement policies or skip to defining execution order.
+       */
+      PlacementPolicySetter add(Hosts hosts, Racks racks, String runnableName, String... runnableNames);
+
+      /**
+       * Specify a placement policy for a list of runnables.
+       * @param type {@link PlacementPolicy.Type} specifying a specific placement policy type.
+       * @param runnableName a runnable name.
+       * @param runnableNames a list of runnable names.
+       * @return A reference to either add more placement policies or skip to defining execution order.
+       */
+      PlacementPolicySetter add(PlacementPolicy.Type type, String runnableName, String... runnableNames);
+    }
+
+    /**
+     * Interface to define execution order after adding placement policies.
+     */
+    public interface AfterPlacementPolicy {
+      /**
+       * Start defining execution order.
+       */
+      FirstOrder withOrder();
+
+      /**
+       * No particular execution order is needed.
+       */
+      AfterOrder anyOrder();
+    }
+
+    public final class PlacementPolicySetter implements MorePlacementPolicies, AfterPlacementPolicy {
+
+      @Override
+      public PlacementPolicySetter add(Hosts hosts, String runnableName, String... runnableNames) {
+        return addPlacementPolicy(PlacementPolicy.Type.DEFAULT, hosts, null, runnableName, runnableNames);
+      }
+
+      @Override
+      public PlacementPolicySetter add(Racks racks, String runnableName, String... runnableNames) {
+        return addPlacementPolicy(PlacementPolicy.Type.DEFAULT, null, racks, runnableName, runnableNames);
+      }
+
+      @Override
+      public PlacementPolicySetter add(Hosts hosts, Racks racks, String runnableName, String... runnableNames) {
+        return addPlacementPolicy(PlacementPolicy.Type.DEFAULT, hosts, racks, runnableName, runnableNames);
+      }
+
+      @Override
+      public PlacementPolicySetter add(PlacementPolicy.Type type, String runnableName, String...runnableNames) {
+        return addPlacementPolicy(type, null, null, runnableName, runnableNames);
+      }
+
+      private PlacementPolicySetter addPlacementPolicy(PlacementPolicy.Type type, Hosts hosts, Racks racks,
+                                                       String runnableName, String...runnableNames) {
+        Preconditions.checkArgument(runnableName != null, "Name cannot be null.");
+        Preconditions.checkArgument(runnables.containsKey(runnableName), "Runnable not exists.");
+        Preconditions.checkArgument(!contains(runnableName),
+                                    "Runnable (" + runnableName + ") cannot belong to more than one Placement Policy");
+        Set<String> runnableNamesSet = Sets.newHashSet(runnableName);
+        for (String name : runnableNames) {
+          Preconditions.checkArgument(name != null, "Name cannot be null.");
+          Preconditions.checkArgument(runnables.containsKey(name), "Runnable not exists.");
+          Preconditions.checkArgument(!contains(name),
+                                      "Runnable (" + name + ") cannot belong to more than one Placement Policy");
+          runnableNamesSet.add(name);
+        }
+        placementPolicies.add(
+          new DefaultTwillSpecification.DefaultPlacementPolicy(runnableNamesSet, type, hosts, racks));
+        return this;
+      }
+
+      private boolean contains(String runnableName) {
+        for (PlacementPolicy placementPolicy : placementPolicies) {
+          if (placementPolicy.getNames().contains(runnableName)) {
+            return true;
+          }
+        }
+        return false;
+      }
+
+      @Override
+      public FirstOrder withOrder() {
+        return new OrderSetter();
+      }
+
+      @Override
+      public AfterOrder anyOrder() {
+        return new OrderSetter();
+      }
+    }
+
     public interface FirstOrder {
       NextOrder begin(String name, String...names);
     }
@@ -303,8 +475,7 @@ public interface TwillSpecification {
 
         // For all unordered runnables, add it to the end of orders list
         orders.add(new DefaultTwillSpecification.DefaultOrder(runnableNames, Order.Type.STARTED));
-
-        return new DefaultTwillSpecification(name, runnables, orders, eventHandler);
+        return new DefaultTwillSpecification(name, runnables, orders, placementPolicies, eventHandler);
       }
 
       private void addOrder(final Order.Type type, String name, String...names) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
index 2998165..1327ce5 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
@@ -17,12 +17,8 @@
  */
 package org.apache.twill.internal;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.twill.api.ResourceSpecification;
 
-import java.util.Collections;
-import java.util.List;
-
 /**
  * Straightforward implementation of {@link org.apache.twill.api.ResourceSpecification}.
  */
@@ -32,24 +28,13 @@ public final class DefaultResourceSpecification implements ResourceSpecification
   private final int instances;
   private final int uplink;
   private final int downlink;
-  private final List<String> hosts;
-  private final List<String> racks;
 
   public DefaultResourceSpecification(int virtualCores, int memorySize, int instances, int uplink, int downlink) {
-    this(virtualCores, memorySize, instances, uplink, downlink,
-            Collections.<String>emptyList(), Collections.<String>emptyList());
-  }
-
-  public DefaultResourceSpecification(int virtualCores, int memorySize, int instances,
-                                      int uplink, int downlink,
-                                      List<String> hosts, List<String> racks) {
     this.virtualCores = virtualCores;
     this.memorySize = memorySize;
     this.instances = instances;
     this.uplink = uplink;
     this.downlink = downlink;
-    this.hosts = ImmutableList.copyOf(hosts);
-    this.racks = ImmutableList.copyOf(racks);
   }
 
   @Deprecated
@@ -74,16 +59,6 @@ public final class DefaultResourceSpecification implements ResourceSpecification
   }
 
   @Override
-  public List<String> getHosts() {
-    return this.hosts;
-  }
-
-  @Override
-  public List<String> getRacks() {
-    return this.racks;
-  }
-
-  @Override
   public int getUplink() {
     return uplink;
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
index fdb8b32..184afb3 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
@@ -22,9 +22,12 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.Hosts;
+import org.apache.twill.api.Racks;
 import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.api.TwillSpecification;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,13 +41,16 @@ public final class DefaultTwillSpecification implements TwillSpecification {
   private final String name;
   private final Map<String, RuntimeSpecification> runnables;
   private final List<Order> orders;
+  private final List<PlacementPolicy> placementPolicies;
   private final EventHandlerSpecification eventHandler;
 
   public DefaultTwillSpecification(String name, Map<String, RuntimeSpecification> runnables,
-                                   List<Order> orders, EventHandlerSpecification eventHandler) {
+                                   List<Order> orders, List<PlacementPolicy> placementPolicies,
+                                   EventHandlerSpecification eventHandler) {
     this.name = name;
     this.runnables = ImmutableMap.copyOf(runnables);
     this.orders = ImmutableList.copyOf(orders);
+    this.placementPolicies = placementPolicies;
     this.eventHandler = eventHandler;
   }
 
@@ -63,6 +69,11 @@ public final class DefaultTwillSpecification implements TwillSpecification {
     return orders;
   }
 
+  @Override
+  public List<PlacementPolicy> getPlacementPolicies() {
+    return placementPolicies;
+  }
+
   @Nullable
   @Override
   public EventHandlerSpecification getEventHandler() {
@@ -100,4 +111,77 @@ public final class DefaultTwillSpecification implements TwillSpecification {
         .toString();
     }
   }
+
+  /**
+   * Straightforward implementation of {@link org.apache.twill.api.TwillSpecification.PlacementPolicy}.
+   */
+  public static final class DefaultPlacementPolicy implements PlacementPolicy {
+
+    private final Set<String> names;
+    private final Type type;
+    private final Hosts hosts;
+    private final Racks racks;
+
+    public DefaultPlacementPolicy(Iterable<String> names, Type type, Hosts hosts, Racks racks) {
+      this.names = ImmutableSet.copyOf(names);
+      this.type = type;
+      this.hosts = hosts;
+      this.racks = racks;
+    }
+
+    public DefaultPlacementPolicy(Iterable<String> names, Type type) {
+      this(names, type, null, null);
+    }
+
+    /**
+     * @return Set of {@link org.apache.twill.api.TwillRunnable} names that belongs to this placement policy.
+     */
+    @Override
+    public Set<String> getNames() {
+      return names;
+    }
+
+    /**
+     * @return {@link org.apache.twill.api.TwillSpecification.PlacementPolicy.Type Type} of this placement policy.
+     */
+    @Override
+    public Type getType() {
+      return type;
+    }
+
+    /**
+     * @return Set of hosts associated with this placement policy.
+     */
+    @Override
+    public Set<String> getHosts() {
+      if (this.hosts == null) {
+        return Collections.emptySet();
+      }
+      return this.hosts.get();
+    }
+
+    /**
+     * @return Set of racks associated with this placement policy.
+     */
+    @Override
+    public Set<String> getRacks() {
+      if (this.racks == null) {
+        return Collections.emptySet();
+      }
+      return this.racks.get();
+    }
+
+    /**
+     * @return String representation of Placement Policy
+     */
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+        .add("names", names)
+        .add("type", type)
+        .add("hosts", hosts)
+        .add("racks", racks)
+        .toString();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index 9912638..fbc6e70 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -31,6 +31,12 @@ public final class Constants {
 
   public static final long PROVISION_TIMEOUT = 30000;
 
+  /**
+   * Milliseconds AM should wait for RM to allocate a constrained provision request.
+   * On timeout, AM relaxes the request constraints.
+   */
+  public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
+
   /** Memory size of AM. */
   public static final int APP_MASTER_MEMORY_MB = 512;
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
index 5f7d7ae..412e406 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
@@ -44,8 +44,6 @@ final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecifi
     json.addProperty("instances", src.getInstances());
     json.addProperty("uplink", src.getUplink());
     json.addProperty("downlink", src.getDownlink());
-    json.add("hosts", context.serialize(src.getHosts()));
-    json.add("racks", context.serialize(src.getRacks()));
     return json;
   }
 
@@ -59,9 +57,7 @@ final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecifi
                                             jsonObj.get("memorySize").getAsInt(),
                                             jsonObj.get("instances").getAsInt(),
                                             jsonObj.get("uplink").getAsInt(),
-                                            jsonObj.get("downlink").getAsInt(),
-                                            hosts,
-                                            racks);
+                                            jsonObj.get("downlink").getAsInt());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
index 67c15a2..d882096 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
@@ -36,6 +36,7 @@ import org.apache.twill.api.TwillRunnableSpecification;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.internal.json.TwillSpecificationCodec.EventHandlerSpecificationCoder;
 import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationOrderCoder;
+import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationPlacementPolicyCoder;
 
 import java.io.File;
 import java.io.IOException;
@@ -61,6 +62,8 @@ public final class TwillSpecificationAdapter {
               .serializeNulls()
               .registerTypeAdapter(TwillSpecification.class, new TwillSpecificationCodec())
               .registerTypeAdapter(TwillSpecification.Order.class, new TwillSpecificationOrderCoder())
+              .registerTypeAdapter(TwillSpecification.PlacementPolicy.class,
+                                   new TwillSpecificationPlacementPolicyCoder())
               .registerTypeAdapter(EventHandlerSpecification.class, new EventHandlerSpecificationCoder())
               .registerTypeAdapter(RuntimeSpecification.class, new RuntimeSpecificationCodec())
               .registerTypeAdapter(TwillRunnableSpecification.class, new TwillRunnableSpecificationCodec())

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
index d46434a..96df950 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
@@ -26,6 +26,8 @@ import com.google.gson.JsonParseException;
 import com.google.gson.JsonSerializationContext;
 import com.google.gson.JsonSerializer;
 import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.Hosts;
+import org.apache.twill.api.Racks;
 import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.internal.DefaultEventHandlerSpecification;
@@ -50,6 +52,8 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
                                             new TypeToken<Map<String, RuntimeSpecification>>() { }.getType()));
     json.add("orders", context.serialize(src.getOrders(),
                                          new TypeToken<List<TwillSpecification.Order>>() { }.getType()));
+    json.add("placementPolicies", context.serialize(
+      src.getPlacementPolicies(), new TypeToken<List<TwillSpecification.PlacementPolicy>>() { }.getType()));
     EventHandlerSpecification eventHandler = src.getEventHandler();
     if (eventHandler != null) {
       json.add("handler", context.serialize(eventHandler, EventHandlerSpecification.class));
@@ -68,6 +72,8 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
       jsonObj.get("runnables"), new TypeToken<Map<String, RuntimeSpecification>>() { }.getType());
     List<TwillSpecification.Order> orders = context.deserialize(
       jsonObj.get("orders"), new TypeToken<List<TwillSpecification.Order>>() { }.getType());
+    List<TwillSpecification.PlacementPolicy> placementPolicies = context.deserialize(
+      jsonObj.get("placementPolicies"), new TypeToken<List<TwillSpecification.PlacementPolicy>>() { }.getType());
 
     JsonElement handler = jsonObj.get("handler");
     EventHandlerSpecification eventHandler = null;
@@ -75,7 +81,7 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
       eventHandler = context.deserialize(handler, EventHandlerSpecification.class);
     }
 
-    return new DefaultTwillSpecification(name, runnables, orders, eventHandler);
+    return new DefaultTwillSpecification(name, runnables, orders, placementPolicies, eventHandler);
   }
 
   static final class TwillSpecificationOrderCoder implements JsonSerializer<TwillSpecification.Order>,
@@ -101,6 +107,34 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
     }
   }
 
+  static final class TwillSpecificationPlacementPolicyCoder implements
+    JsonSerializer<TwillSpecification.PlacementPolicy>, JsonDeserializer<TwillSpecification.PlacementPolicy> {
+
+    @Override
+    public JsonElement serialize(TwillSpecification.PlacementPolicy src, Type typeOfSrc,
+                                 JsonSerializationContext context) {
+      JsonObject json = new JsonObject();
+      json.add("names", context.serialize(src.getNames(), new TypeToken<Set<String>>() { }.getType()));
+      json.addProperty("type", src.getType().name());
+      json.add("hosts", context.serialize(src.getHosts(), new TypeToken<Set<String>>() { }.getType()));
+      json.add("racks", context.serialize(src.getRacks(), new TypeToken<Set<String>>() { }.getType()));
+      return json;
+    }
+
+    @Override
+    public TwillSpecification.PlacementPolicy deserialize(JsonElement json, Type typeOfT,
+                                                          JsonDeserializationContext context)
+      throws JsonParseException {
+      JsonObject jsonObj = json.getAsJsonObject();
+      Set<String> names = context.deserialize(jsonObj.get("names"), new TypeToken<Set<String>>() { }.getType());
+      TwillSpecification.PlacementPolicy.Type type =
+        TwillSpecification.PlacementPolicy.Type.valueOf(jsonObj.get("type").getAsString());
+      Set<String> hosts = context.deserialize(jsonObj.get("hosts"), new TypeToken<Set<String>>() { }.getType());
+      Set<String> racks = context.deserialize(jsonObj.get("racks"), new TypeToken<Set<String>>() { }.getType());
+      return new DefaultTwillSpecification.DefaultPlacementPolicy(names, type, new Hosts(hosts), new Racks(racks));
+    }
+  }
+
   static final class EventHandlerSpecificationCoder implements JsonSerializer<EventHandlerSpecification>,
                                                                JsonDeserializer<EventHandlerSpecification> {
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java b/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
index e8b0eff..2e949d4 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
@@ -26,8 +26,6 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.unitils.reflectionassert.ReflectionAssert;
 
-import java.util.Arrays;
-
 /**
  * Maybe this checkstyle rule needs to be removed.
  */
@@ -45,13 +43,10 @@ public class ResourceSpecificationCodecTest {
                     "\"memorySize\":1024," +
                     "\"instances\":2," +
                     "\"uplink\":100," +
-                    "\"downlink\":100," +
-                    "\"hosts\":[\"one1\",\"two2\"]," +
-                    "\"racks\":[\"three3\"]" +
+                    "\"downlink\":100" +
             "}";
     final ResourceSpecification expected =
-            new DefaultResourceSpecification(2, 1024, 2, 100, 100,
-                    Arrays.asList("one1", "two2"), Arrays.asList("three3"));
+            new DefaultResourceSpecification(2, 1024, 2, 100, 100);
     final String actualString = gson.toJson(expected);
     Assert.assertEquals(expectedString, actualString);
 
@@ -71,12 +66,9 @@ public class ResourceSpecificationCodecTest {
             .setInstances(3)
             .setUplink(10, ResourceSpecification.SizeUnit.GIGA)
             .setDownlink(5, ResourceSpecification.SizeUnit.GIGA)
-            .setHosts("a1", "b2", "c3")
-            .setRacks("r2")
             .build();
     final DefaultResourceSpecification expected =
-            new DefaultResourceSpecification(5, 4096, 3, 10240, 5120,
-                    Arrays.asList("a1", "b2", "c3"), Arrays.asList("r2"));
+            new DefaultResourceSpecification(5, 4096, 3, 10240, 5120);
     ReflectionAssert.assertLenientEquals(expected, actual);
   }
 
@@ -88,12 +80,9 @@ public class ResourceSpecificationCodecTest {
             .setInstances(3)
             .setUplink(10, ResourceSpecification.SizeUnit.GIGA)
             .setDownlink(5, ResourceSpecification.SizeUnit.GIGA)
-            .setHosts(Arrays.asList("a1", "b2", "c3"))
-            .setRacks(Arrays.asList("r2"))
             .build();
     final DefaultResourceSpecification expected =
-            new DefaultResourceSpecification(5, 4096, 3, 10240, 5120,
-                    Arrays.asList("a1", "b2", "c3"), Arrays.asList("r2"));
+            new DefaultResourceSpecification(5, 4096, 3, 10240, 5120);
     ReflectionAssert.assertLenientEquals(expected, actual);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
index 9b66f67..a990cc0 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -100,6 +100,11 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
   }
 
   @Override
+  public int getNMPort() {
+    return Integer.parseInt(System.getenv().get(ApplicationConstants.NM_PORT_ENV));
+  }
+
+  @Override
   protected Resource adjustCapability(Resource resource) {
     int cores = YarnUtils.getVirtualCores(resource);
     int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
@@ -123,7 +128,9 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
 
   @Override
   protected AMRMClient.ContainerRequest createContainerRequest(Priority priority, Resource capability,
-                                                               @Nullable String[] hosts, @Nullable String[] racks) {
+                                                               @Nullable String[] hosts, @Nullable String[] racks,
+                                                               boolean relaxLocality) {
+    // Ignore relaxLocality param since the corresponding support is not present in Hadoop 2.0.
     return new AMRMClient.ContainerRequest(capability, hosts, racks, priority, 1);
   }
 
@@ -138,6 +145,14 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
   }
 
   @Override
+  protected void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+    // An empty implementation since Blacklist is not supported in Hadoop-2.0.
+    if (recordUnsupportedFeature("blacklist")) {
+      LOG.warn("Blacklist is not supported in Hadoop 2.0");
+    }
+  }
+
+  @Override
   protected AllocateResult doAllocate(float progress) throws Exception {
     AllocationResponse response = amrmClient.allocate(progress);
     List<RunnableProcessLauncher> launchers

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
index 6fdd99e..3f7422d 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.YarnClient;
 import org.apache.hadoop.yarn.client.YarnClientImpl;
@@ -45,6 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.List;
 
 /**
  *
@@ -156,6 +158,11 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements
   }
 
   @Override
+  public List<NodeReport> getNodeReports() throws Exception {
+    return this.yarnClient.getNodeReports();
+  }
+
+  @Override
   protected void startUp() throws Exception {
     yarnClient.start();
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
index 78a5135..c5ac458 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -42,11 +42,11 @@ import javax.annotation.Nullable;
 /**
  *
  */
-public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.ContainerRequest> {
+public class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.ContainerRequest> {
 
   private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAMClient.class);
 
-  private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+  protected static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
 
   static {
     STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
@@ -57,9 +57,9 @@ public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.
     };
   }
 
-  private final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
-  private final Hadoop21YarnNMClient nmClient;
-  private Resource maxCapability;
+  protected final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
+  protected final Hadoop21YarnNMClient nmClient;
+  protected Resource maxCapability;
 
   public Hadoop21YarnAMClient(Configuration conf) {
     super(ApplicationConstants.Environment.CONTAINER_ID.name());
@@ -95,9 +95,15 @@ public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.
   }
 
   @Override
+  public int getNMPort() {
+    return Integer.parseInt(System.getenv().get(ApplicationConstants.Environment.NM_PORT.name()));
+  }
+
+  @Override
   protected AMRMClient.ContainerRequest createContainerRequest(Priority priority, Resource capability,
-                                                               @Nullable String[] hosts, @Nullable String[] racks) {
-    return new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
+                                                               @Nullable String[] hosts, @Nullable String[] racks,
+                                                               boolean relaxLocality) {
+    return new AMRMClient.ContainerRequest(capability, hosts, racks, priority, relaxLocality);
   }
 
   @Override
@@ -111,6 +117,14 @@ public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.
   }
 
   @Override
+  protected void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+    // An empty implementation since Blacklist is not supported in Hadoop-2.1 AMRMClient.
+    if (recordUnsupportedFeature("blacklist")) {
+      LOG.warn("Blacklist is not supported in Hadoop 2.1 AMRMClient");
+    }
+  }
+
+  @Override
   protected AllocateResult doAllocate(float progress) throws Exception {
     AllocateResponse allocateResponse = amrmClient.allocate(progress);
     List<RunnableProcessLauncher> launchers

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
index 20558bb..9b70a86 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@@ -41,6 +42,8 @@ import org.apache.twill.internal.appmaster.ApplicationSubmitter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 /**
  *
  */
@@ -136,6 +139,11 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements
   }
 
   @Override
+  public List<NodeReport> getNodeReports() throws Exception {
+    return this.yarnClient.getNodeReports();
+  }
+
+  @Override
   protected void startUp() throws Exception {
     yarnClient.start();
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java b/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
new file mode 100644
index 0000000..3ee99e8
--- /dev/null
+++ b/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Wrapper class for AMRMClient for Hadoop version 2.2 or greater.
+ */
+public final class Hadoop22YarnAMClient extends Hadoop21YarnAMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop22YarnAMClient.class);
+
+  public Hadoop22YarnAMClient(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  protected void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+    LOG.debug("Blacklist Additions: {} , Blacklist Removals: {}", blacklistAdditions, blacklistRemovals);
+    amrmClient.updateBlacklist(blacklistAdditions, blacklistRemovals);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
new file mode 100644
index 0000000..38cb32c
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * This class defines how the containers should be allocated.
+ */
+public class AllocationSpecification {
+
+  /**
+   * Defines different types of allocation strategies.
+   */
+  enum Type {
+    ALLOCATE_ONE_INSTANCE_AT_A_TIME,
+    DEFAULT
+  }
+
+  /**
+   * Resource specification of runnables.
+   */
+  private Resource resource;
+
+  /**
+   * Allocation strategy Type.
+   */
+  private Type type;
+
+  /**
+   * Name of runnable. Set to null if the class represents more than one runnable.
+   */
+  private String runnableName;
+
+  /**
+   * Instance number for the runnable. Set to  0 if the class represents more than one instance / runnable.
+   */
+  private int instanceId;
+
+  public AllocationSpecification(Resource resource) {
+    this(resource, Type.DEFAULT, null, 0);
+  }
+
+  public AllocationSpecification(Resource resource, Type type, String runnableName, int instanceId) {
+    this.resource = resource;
+    this.type = type;
+    this.runnableName = runnableName;
+    this.instanceId = instanceId;
+  }
+
+  public Resource getResource() {
+    return this.resource;
+  }
+
+  public Type getType() {
+    return this.type;
+  }
+
+  public String getRunnableName() {
+    return this.runnableName;
+  }
+
+  public int getInstanceId() {
+    return this.instanceId;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof AllocationSpecification)) {
+      return false;
+    }
+    AllocationSpecification other = (AllocationSpecification) obj;
+    return (instanceId == other.instanceId) &&
+      Objects.equal(resource, other.resource) &&
+      Objects.equal(type, other.type) &&
+      Objects.equal(runnableName, other.runnableName);
+  }
+
+  @Override
+  public int hashCode() {
+    return this.resource.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index c25a002..b4f9997 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -25,7 +25,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -64,6 +63,7 @@ import org.apache.twill.filesystem.Location;
 import org.apache.twill.internal.AbstractTwillService;
 import org.apache.twill.internal.Configs;
 import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ContainerInfo;
 import org.apache.twill.internal.DefaultTwillRunResources;
 import org.apache.twill.internal.EnvKeys;
 import org.apache.twill.internal.JvmOptions;
@@ -129,6 +129,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
   private final int reservedMemory;
   private final EventHandler eventHandler;
   private final Location applicationLocation;
+  private final PlacementPolicyManager placementPolicyManager;
 
   private EmbeddedKafkaServer kafkaServer;
   private Queue<RunnableContainerRequest> runnableContainerRequests;
@@ -146,6 +147,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
     this.credentials = createCredentials();
     this.jvmOpts = loadJvmOptions();
     this.reservedMemory = getReservedMemory();
+    this.placementPolicyManager = new PlacementPolicyManager(twillSpec.getPlacementPolicies());
 
     amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
                                                    Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
@@ -348,7 +350,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
 
   private void doRun() throws Exception {
     // The main loop
-    Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> currentRequest = null;
+    Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> currentRequest = null;
     final Queue<ProvisionRequest> provisioning = Lists.newLinkedList();
 
     YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() {
@@ -363,6 +365,8 @@ public final class ApplicationMasterService extends AbstractTwillService {
       }
     };
 
+    long requestStartTime = 0;
+    boolean isRequestRelaxed = false;
     long nextTimeoutCheck = System.currentTimeMillis() + Constants.PROVISION_TIMEOUT;
     while (isRunning()) {
       // Call allocate. It has to be made at first in order to be able to get cluster resource availability.
@@ -383,10 +387,25 @@ public final class ApplicationMasterService extends AbstractTwillService {
           runnableContainerRequests.poll();
         }
       }
+
       // Nothing in provision, makes the next batch of provision request
       if (provisioning.isEmpty() && currentRequest != null) {
-        addContainerRequests(currentRequest.getKey(), currentRequest.getValue(), provisioning);
+        manageBlacklist(currentRequest);
+        addContainerRequests(currentRequest.getKey().getResource(), currentRequest.getValue(), provisioning,
+                             currentRequest.getKey().getType());
         currentRequest = null;
+        requestStartTime = System.currentTimeMillis();
+        isRequestRelaxed = false;
+      }
+
+      // Check for provision request timeout i.e. check if any provision request has been pending
+      // for more than the designated time. On timeout, relax the request constraints.
+      if (!provisioning.isEmpty() && !isRequestRelaxed &&
+        (System.currentTimeMillis() - requestStartTime) > Constants.CONSTRAINED_PROVISION_REQUEST_TIMEOUT) {
+        LOG.info("Relaxing provisioning constraints for request {}", provisioning.peek().getRequestId());
+        // Clear the blacklist for the pending provision request(s).
+        clearBlacklist();
+        isRequestRelaxed = true;
       }
 
       nextTimeoutCheck = checkProvisionTimeout(nextTimeoutCheck);
@@ -398,6 +417,37 @@ public final class ApplicationMasterService extends AbstractTwillService {
   }
 
   /**
+   * Manage Blacklist for a given request.
+   */
+  private void manageBlacklist(Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> request) {
+    clearBlacklist();
+
+    //Check the allocation strategy
+    AllocationSpecification currentAllocationSpecification = request.getKey();
+    if (currentAllocationSpecification.getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
+
+      //Check the placement policy
+      TwillSpecification.PlacementPolicy placementPolicy =
+        placementPolicyManager.getPlacementPolicy(currentAllocationSpecification.getRunnableName());
+      if (placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+
+        //Update blacklist with hosts which are running DISTRIBUTED runnables
+        for (String runnable : placementPolicyManager.getFellowRunnables(request.getKey().getRunnableName())) {
+          Collection<ContainerInfo> containerStats =
+            runningContainers.getContainerInfo(runnable);
+          for (ContainerInfo containerInfo : containerStats) {
+            // Yarn Resource Manager may include port in the node name depending on the setting
+            // YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME. It is safe to add both
+            // the names (with and without port) to the blacklist.
+            addToBlacklist(containerInfo.getHost().getHostName());
+            addToBlacklist(containerInfo.getHost().getHostName() + ":" + containerInfo.getPort());
+          }
+        }
+      }
+    }
+  }
+
+  /**
    * Handling containers that are completed.
    */
   private void handleCompleted(List<YarnContainerStatus> completedContainersStatuses) {
@@ -433,13 +483,16 @@ public final class ApplicationMasterService extends AbstractTwillService {
     // Invoke event handler for provision request timeout
     Map<String, ExpectedContainers.ExpectedCount> expiredRequests = expectedContainers.getAll();
     Map<String, Integer> runningCounts = runningContainers.countAll();
+    Map<String, Integer> completedContainerCount = runningContainers.getCompletedContainerCount();
 
     List<EventHandler.TimeoutEvent> timeoutEvents = Lists.newArrayList();
     for (Map.Entry<String, ExpectedContainers.ExpectedCount> entry : expiredRequests.entrySet()) {
       String runnableName = entry.getKey();
       ExpectedContainers.ExpectedCount expectedCount = entry.getValue();
       int runningCount = runningCounts.containsKey(runnableName) ? runningCounts.get(runnableName) : 0;
-      if (expectedCount.getCount() != runningCount) {
+      int completedCount = completedContainerCount.containsKey(runnableName) ?
+        completedContainerCount.get(runnableName) : 0;
+      if (expectedCount.getCount() > runningCount + completedCount) {
         timeoutEvents.add(new EventHandler.TimeoutEvent(runnableName, expectedCount.getCount(),
                                                                    runningCount, expectedCount.getTimestamp()));
       }
@@ -488,42 +541,101 @@ public final class ApplicationMasterService extends AbstractTwillService {
   private Queue<RunnableContainerRequest> initContainerRequests() {
     // Orderly stores container requests.
     Queue<RunnableContainerRequest> requests = Lists.newLinkedList();
-    // For each order in the twillSpec, create container request for each runnable.
+    // For each order in the twillSpec, create container request for runnables, depending on Placement policy.
     for (TwillSpecification.Order order : twillSpec.getOrders()) {
-      // Group container requests based on resource requirement.
-      ImmutableMultimap.Builder<Resource, RuntimeSpecification> builder = ImmutableMultimap.builder();
-      for (String runnableName : order.getNames()) {
+      Set<String> distributedRunnables = placementPolicyManager.getDistributedRunnables(order.getNames());
+      Set<String> defaultRunnables = Sets.newHashSet();
+      defaultRunnables.addAll(order.getNames());
+      defaultRunnables.removeAll(distributedRunnables);
+      Map<AllocationSpecification, Collection<RuntimeSpecification>> requestsMap = Maps.newHashMap();
+      for (String runnableName : distributedRunnables) {
         RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
         Resource capability = createCapability(runtimeSpec.getResourceSpecification());
-        builder.put(capability, runtimeSpec);
+        for (int instanceId = 0; instanceId < runtimeSpec.getResourceSpecification().getInstances(); instanceId++) {
+          AllocationSpecification allocationSpecification =
+            new AllocationSpecification(capability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME,
+                                        runnableName, instanceId);
+          addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+        }
       }
-      requests.add(new RunnableContainerRequest(order.getType(), builder.build()));
+      for (String runnableName : defaultRunnables) {
+        RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
+        Resource capability = createCapability(runtimeSpec.getResourceSpecification());
+        AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
+        addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+      }
+      requests.add(new RunnableContainerRequest(order.getType(), requestsMap));
     }
     return requests;
   }
 
   /**
+   * Helper method to create {@link org.apache.twill.internal.appmaster.RunnableContainerRequest}.
+   */
+  private void addAllocationSpecification(AllocationSpecification allocationSpecification,
+                                          Map<AllocationSpecification, Collection<RuntimeSpecification>> map,
+                                          RuntimeSpecification runtimeSpec) {
+    if (!map.containsKey(allocationSpecification)) {
+      map.put(allocationSpecification, Lists.<RuntimeSpecification>newLinkedList());
+    }
+    map.get(allocationSpecification).add(runtimeSpec);
+  }
+
+  /**
    * Adds container requests with the given resource capability for each runtime.
    */
   private void addContainerRequests(Resource capability,
                                     Collection<RuntimeSpecification> runtimeSpecs,
-                                    Queue<ProvisionRequest> provisioning) {
+                                    Queue<ProvisionRequest> provisioning,
+                                    AllocationSpecification.Type allocationType) {
     for (RuntimeSpecification runtimeSpec : runtimeSpecs) {
       String name = runtimeSpec.getName();
       int newContainers = expectedContainers.getExpected(name) - runningContainers.count(name);
       if (newContainers > 0) {
+        if (allocationType.equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
+          //Spawning 1 instance at a time
+          newContainers = 1;
+        }
+        TwillSpecification.PlacementPolicy placementPolicy = placementPolicyManager.getPlacementPolicy(name);
+        Set<String> hosts = Sets.newHashSet();
+        Set<String> racks = Sets.newHashSet();
+        if (placementPolicy != null) {
+          hosts = placementPolicy.getHosts();
+          racks = placementPolicy.getRacks();
+        }
         // TODO: Allow user to set priority?
-        LOG.info("Request {} container with capability {}", newContainers, capability);
+        LOG.info("Request {} container with capability {} for runnable {}", newContainers, capability, name);
         String requestId = amClient.addContainerRequest(capability, newContainers)
-                .addHosts(runtimeSpec.getResourceSpecification().getHosts())
-                .addRacks(runtimeSpec.getResourceSpecification().getRacks())
+                .addHosts(hosts)
+                .addRacks(racks)
                 .setPriority(0).apply();
-        provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers));
+        provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers, allocationType));
       }
     }
   }
 
   /**
+   * Add a resource to the blacklist.
+   */
+  private void addToBlacklist(String resource) {
+    amClient.addToBlacklist(resource);
+  }
+
+  /**
+   * Remove a resource from the blacklist.
+   */
+  private void removeFromBlacklist(String resource) {
+    amClient.removeFromBlacklist(resource);
+  }
+
+  /**
+   * Clears the resource blacklist.
+   */
+  private void clearBlacklist() {
+    amClient.clearBlacklist();
+  }
+
+  /**
    * Launches runnables in the provisioned containers.
    */
   private void launchRunnable(List<? extends ProcessLauncher<YarnContainerInfo>> launchers,
@@ -564,9 +676,12 @@ public final class ApplicationMasterService extends AbstractTwillService {
         amClient.completeContainerRequest(provisionRequest.getRequestId());
       }
 
+      if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName) ||
+        provisioning.peek().getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
+        provisioning.poll();
+      }
       if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName)) {
         LOG.info("Runnable " + runnableName + " fully provisioned with " + containerCount + " instances.");
-        provisioning.poll();
       }
     }
   }
@@ -708,7 +823,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
               }
             } else {
               // Increase the number of instances
-              runnableContainerRequests.add(createRunnableContainerRequest(runnableName));
+              runnableContainerRequests.add(createRunnableContainerRequest(runnableName, newCount - oldCount));
             }
           } finally {
             runningContainers.sendToRunnable(runnableName, message, completion);
@@ -723,6 +838,11 @@ public final class ApplicationMasterService extends AbstractTwillService {
   }
 
   private RunnableContainerRequest createRunnableContainerRequest(final String runnableName) {
+    return createRunnableContainerRequest(runnableName, 1);
+  }
+
+  private RunnableContainerRequest createRunnableContainerRequest(final String runnableName,
+                                                                  final int numberOfInstances) {
     // Find the current order of the given runnable in order to create a RunnableContainerRequest.
     TwillSpecification.Order order = Iterables.find(twillSpec.getOrders(), new Predicate<TwillSpecification.Order>() {
       @Override
@@ -733,7 +853,20 @@ public final class ApplicationMasterService extends AbstractTwillService {
 
     RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
     Resource capability = createCapability(runtimeSpec.getResourceSpecification());
-    return new RunnableContainerRequest(order.getType(), ImmutableMultimap.of(capability, runtimeSpec));
+    Map<AllocationSpecification, Collection<RuntimeSpecification>> requestsMap = Maps.newHashMap();
+    if (placementPolicyManager.getPlacementPolicyType(runnableName).equals(
+      TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+      for (int instanceId = 0; instanceId < numberOfInstances; instanceId++) {
+        AllocationSpecification allocationSpecification =
+          new AllocationSpecification(capability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME,
+                                      runnableName, instanceId);
+        addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+      }
+    } else {
+      AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
+      addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+    }
+    return new RunnableContainerRequest(order.getType(), requestsMap);
   }
 
   private Runnable getMessageCompletion(final String messageId, final SettableFuture<String> future) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
new file mode 100644
index 0000000..91a58bb
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.internal.appmaster;
+
+import com.google.common.collect.Sets;
+import org.apache.twill.api.TwillSpecification;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class provides helper functions for operating on a set of Placement Policies.
+ */
+public class PlacementPolicyManager {
+  List<TwillSpecification.PlacementPolicy> placementPolicies;
+
+  public PlacementPolicyManager(List<TwillSpecification.PlacementPolicy> placementPolicies) {
+    this.placementPolicies = placementPolicies;
+  }
+
+  /**
+   * Given a set of runnables, get all runnables which belong to DISTRIBUTED placement policies.
+   * @param givenRunnables Set of runnables.
+   * @return Subset of runnables, which belong to DISTRIBUTED placement policies.
+   */
+  public Set<String> getDistributedRunnables(Set<String> givenRunnables) {
+    Set<String> distributedRunnables = getAllDistributedRunnables();
+    distributedRunnables.retainAll(givenRunnables);
+    return distributedRunnables;
+  }
+
+  /**
+   * Given a runnable, get the type of placement policy. Returns DEFAULT if no placement policy is specified.
+   * @param runnableName Name of runnable.
+   * @return Placement policy type of the runnable.
+   */
+  public TwillSpecification.PlacementPolicy.Type getPlacementPolicyType(String runnableName) {
+    for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+      if (placementPolicy.getNames().contains(runnableName)) {
+        return placementPolicy.getType();
+      }
+    }
+    return TwillSpecification.PlacementPolicy.Type.DEFAULT;
+  }
+
+  /**
+   * Get all runnables which belong to the same Placement policy as the given runnable.
+   * @param runnableName Name of runnable.
+   * @return Set of runnables, with same placement policy.
+   */
+  public Set<String> getFellowRunnables(String runnableName) {
+    for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+      if (placementPolicy.getNames().contains(runnableName)) {
+        return placementPolicy.getNames();
+      }
+    }
+    return Collections.emptySet();
+  }
+
+  /**
+   * Get the placement policy of the runnable.
+   * Returns null if runnable does not belong to a placement policy.
+   * @param runnableName Name of runnable.
+   * @return Placement policy of the runnable.
+   */
+  public TwillSpecification.PlacementPolicy getPlacementPolicy(String runnableName) {
+    for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+      if (placementPolicy.getNames().contains(runnableName)) {
+        return placementPolicy;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Gets all runnables which belong to DISTRIBUTED placement policies.
+   */
+  private Set<String> getAllDistributedRunnables() {
+    Set<String> distributedRunnables = Sets.newHashSet();
+    for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+      if (placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+        distributedRunnables.addAll(placementPolicy.getNames());
+      }
+    }
+    return  distributedRunnables;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
index 002d2a5..a6eba3e 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
@@ -26,11 +26,18 @@ final class ProvisionRequest {
   private final RuntimeSpecification runtimeSpec;
   private final String requestId;
   private int requestCount;
+  private final AllocationSpecification.Type type;
 
   ProvisionRequest(RuntimeSpecification runtimeSpec, String requestId, int requestCount) {
+    this(runtimeSpec, requestId, requestCount, AllocationSpecification.Type.DEFAULT);
+  }
+
+  ProvisionRequest(RuntimeSpecification runtimeSpec, String requestId, int requestCount,
+                   AllocationSpecification.Type type) {
     this.runtimeSpec = runtimeSpec;
     this.requestId = requestId;
     this.requestCount = requestCount;
+    this.type = type;
   }
 
   RuntimeSpecification getRuntimeSpec() {
@@ -49,4 +56,8 @@ final class ProvisionRequest {
     requestCount--;
     return requestCount == 0;
   }
+
+  AllocationSpecification.Type getType() {
+    return this.type;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
index de199ad..2105629 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
@@ -20,7 +20,6 @@ package org.apache.twill.internal.appmaster;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.api.TwillSpecification;
@@ -34,12 +33,12 @@ import java.util.Map;
  */
 final class RunnableContainerRequest {
   private final TwillSpecification.Order.Type orderType;
-  private final Iterator<Map.Entry<Resource, Collection<RuntimeSpecification>>> requests;
+  private final Iterator<Map.Entry<AllocationSpecification, Collection<RuntimeSpecification>>> requests;
 
   RunnableContainerRequest(TwillSpecification.Order.Type orderType,
-                           Multimap<Resource, RuntimeSpecification> requests) {
+                           Map<AllocationSpecification, Collection<RuntimeSpecification>> requests) {
     this.orderType = orderType;
-    this.requests = requests.asMap().entrySet().iterator();
+    this.requests = requests.entrySet().iterator();
   }
 
   TwillSpecification.Order.Type getOrderType() {
@@ -51,8 +50,8 @@ final class RunnableContainerRequest {
    * @return The {@link Resource} and {@link Collection} of {@link RuntimeSpecification} or
    *         {@code null} if there is no more request.
    */
-  Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> takeRequest() {
-    Map.Entry<Resource, Collection<RuntimeSpecification>> next = Iterators.getNext(requests, null);
+  Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> takeRequest() {
+    Map.Entry<AllocationSpecification, Collection<RuntimeSpecification>> next = Iterators.getNext(requests, null);
     return next == null ? null : Maps.immutableEntry(next.getKey(), ImmutableList.copyOf(next.getValue()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
index 2126541..0e3aea5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
@@ -68,8 +68,9 @@ public final class RunnableProcessLauncher extends AbstractYarnProcessLauncher<Y
 
     launchContext.setEnvironment(env);
 
-    LOG.info("Launching in container {} at {}, {}",
-             containerInfo.getId(), containerInfo.getHost(), launchContext.getCommands());
+    LOG.info("Launching in container {} at {}:{}, {}",
+             containerInfo.getId(), containerInfo.getHost().getHostName(),
+             containerInfo.getPort(), launchContext.getCommands());
     final Cancellable cancellable = nmClient.start(containerInfo, launchContext);
     launched = true;
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index 5267616..383e5e0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -20,10 +20,13 @@ package org.apache.twill.internal.appmaster;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Table;
 import com.google.common.util.concurrent.FutureCallback;
@@ -53,6 +56,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.BitSet;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
@@ -85,20 +89,24 @@ final class RunningContainers {
 
   // Map from runnableName to a BitSet, with the <instanceId> bit turned on for having an instance running.
   private final Map<String, BitSet> runnableInstances;
+  private final Map<String, Integer> completedContainerCount;
   private final DefaultResourceReport resourceReport;
   private final Deque<String> startSequence;
   private final Lock containerLock;
   private final Condition containerChange;
   private final ZKClient zkClient;
+  private final Multimap<String, ContainerInfo> containerStats;
 
   RunningContainers(String appId, TwillRunResources appMasterResources, ZKClient zookeeperClient) {
     containers = HashBasedTable.create();
     runnableInstances = Maps.newHashMap();
+    completedContainerCount = Maps.newHashMap();
     startSequence = Lists.newLinkedList();
     containerLock = new ReentrantLock();
     containerChange = containerLock.newCondition();
     resourceReport = new DefaultResourceReport(appId, appMasterResources);
     zkClient = zookeeperClient;
+    containerStats = HashMultimap.create();
   }
 
   /**
@@ -128,6 +136,7 @@ final class RunningContainers {
                                                                  containerInfo.getHost().getHostName(),
                                                                  controller);
       resourceReport.addRunResources(runnableName, resources);
+      containerStats.put(runnableName, containerInfo);
 
       if (startSequence.isEmpty() || !runnableName.equals(startSequence.peekLast())) {
         startSequence.addLast(runnableName);
@@ -157,6 +166,15 @@ final class RunningContainers {
   }
 
   /**
+   * Given a runnable name, returns a list of {@link org.apache.twill.internal.ContainerInfo} for it's instances.
+   * @param runnableName name of a runnable.
+   * @return a list of {@link org.apache.twill.internal.ContainerInfo} for instances of a runnable.
+   */
+  Collection<ContainerInfo> getContainerInfo(String runnableName) {
+    return containerStats.get(runnableName);
+  }
+
+  /**
    * Stops and removes the last running container of the given runnable.
    */
   void removeLast(String runnableName) {
@@ -186,6 +204,7 @@ final class RunningContainers {
       LOG.info("Stopping service: {} {}", runnableName, lastController.getRunId());
       lastController.stopAndWait();
       containers.remove(runnableName, lastContainerId);
+      removeContainerInfo(lastContainerId);
       removeInstanceId(runnableName, maxInstanceId);
       resourceReport.removeRunnableResources(runnableName, lastContainerId);
       containerChange.signalAll();
@@ -232,6 +251,18 @@ final class RunningContainers {
     }
   }
 
+  /**
+   * Returns a Map containing number of successfully completed containers for all runnables.
+   */
+  Map<String, Integer> getCompletedContainerCount() {
+    containerLock.lock();
+    try {
+      return ImmutableMap.copyOf(completedContainerCount);
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
   void sendToAll(Message message, Runnable completion) {
     containerLock.lock();
     try {
@@ -293,6 +324,7 @@ final class RunningContainers {
       }
       containers.clear();
       runnableInstances.clear();
+      containerStats.clear();
     } finally {
       containerLock.unlock();
     }
@@ -320,6 +352,7 @@ final class RunningContainers {
     ContainerState state = status.getState();
 
     try {
+      removeContainerInfo(containerId);
       Map<String, TwillContainerController> lookup = containers.column(containerId);
       if (lookup.isEmpty()) {
         // It's OK because if a container is stopped through removeLast, this would be empty.
@@ -346,6 +379,12 @@ final class RunningContainers {
         TwillContainerController controller = completedEntry.getValue();
         controller.completed(exitStatus);
 
+        if (exitStatus == ContainerExitCodes.SUCCESS) {
+          if (!completedContainerCount.containsKey(runnableName)) {
+            completedContainerCount.put(runnableName, 0);
+          }
+          completedContainerCount.put(runnableName, completedContainerCount.get(runnableName) + 1);
+        }
         removeInstanceId(runnableName, getInstanceId(controller.getRunId()));
         resourceReport.removeRunnableResources(runnableName, containerId);
       }
@@ -456,6 +495,21 @@ final class RunningContainers {
   }
 
   /**
+   * Given the containerId, removes the corresponding containerInfo.
+   * @param containerId Id for the container to be removed.
+   * @return Returns {@code false} if container with the provided id was not found, {@code true} otherwise.
+   */
+  private boolean removeContainerInfo(String containerId) {
+    for (ContainerInfo containerInfo : this.containerStats.values()) {
+      if (containerInfo.getId().equals(containerId)) {
+        this.containerStats.values().remove(containerInfo);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * A helper class that overrides the debug port of the resources with the live info from the container controller.
    */
   private static class DynamicTwillRunResources extends DefaultTwillRunResources {


[4/6] git commit: (TWILL-101) Schedule HDFS delegation token update before it expires.

Posted by ch...@apache.org.
(TWILL-101) Schedule HDFS delegation token update before it expires.

Signed-off-by: Terence Yim <te...@continuuity.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/7a72ce18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/7a72ce18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/7a72ce18

Branch: refs/heads/site
Commit: 7a72ce18e37d94d5baf0237e24b7aaeb8ca7647f
Parents: 8cf4cd0
Author: Terence Yim <te...@continuuity.com>
Authored: Tue Sep 23 01:58:22 2014 -0700
Committer: Terence Yim <te...@continuuity.com>
Committed: Tue Sep 23 02:21:58 2014 -0700

----------------------------------------------------------------------
 .../org/apache/twill/yarn/YarnTwillRunnerService.java     | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7a72ce18/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 4c16cc1..2634e78 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -309,8 +309,14 @@ public final class YarnTwillRunnerService extends AbstractIdleService implements
 
     // Schedule an updater for updating HDFS delegation tokens
     if (UserGroupInformation.isSecurityEnabled()) {
-      long delay = yarnConfig.getLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
-                                      DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+      long renewalInterval = yarnConfig.getLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+                                                DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+      // Schedule it five minutes before it expires.
+      long delay = renewalInterval - TimeUnit.MINUTES.toMillis(5);
+      // Just to safeguard. In practice, the value shouldn't be that small, otherwise nothing could work.
+      if (delay <= 0) {
+        delay = (renewalInterval <= 2) ? 1 : renewalInterval / 2;
+      }
       scheduleSecureStoreUpdate(new LocationSecureStoreUpdater(yarnConfig, locationFactory),
                                 delay, delay, TimeUnit.MILLISECONDS);
     }


[6/6] git commit: Merge branch 'master' into site

Posted by ch...@apache.org.
Merge branch 'master' into site


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/128a16f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/128a16f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/128a16f3

Branch: refs/heads/site
Commit: 128a16f353e3e71b83ffc9faf7e96f9daa6fc8b9
Parents: 215bc0c 7f3d7b8
Author: Terence Yim <te...@cask.co>
Authored: Tue Oct 7 17:04:22 2014 -0700
Committer: Terence Yim <te...@cask.co>
Committed: Tue Oct 7 17:04:22 2014 -0700

----------------------------------------------------------------------
 .../org/apache/twill/yarn/YarnTwillRunnerService.java     | 10 ++++++++--
 .../src/test/java/org/apache/twill/yarn/DebugTestRun.java |  8 ++++----
 .../java/org/apache/twill/yarn/ResourceReportTestRun.java | 10 +++++-----
 .../twill/internal/zookeeper/LeaderElectionTest.java      |  6 +++---
 4 files changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------



[3/6] git commit: Merge branch 'master' into site

Posted by ch...@apache.org.
Merge branch 'master' into site


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/215bc0cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/215bc0cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/215bc0cf

Branch: refs/heads/site
Commit: 215bc0cf01f38a1d7f0b95fab4e525bb2c81a38e
Parents: 0004452 8cf4cd0
Author: Terence Yim <te...@continuuity.com>
Authored: Mon Sep 22 22:15:59 2014 -0700
Committer: Terence Yim <te...@continuuity.com>
Committed: Mon Sep 22 22:15:59 2014 -0700

----------------------------------------------------------------------
 pom.xml                                         |   2 +
 .../main/java/org/apache/twill/api/Hosts.java   |  65 +++
 .../main/java/org/apache/twill/api/Racks.java   |  65 +++
 .../apache/twill/api/ResourceSpecification.java |  58 +--
 .../apache/twill/api/TwillSpecification.java    | 175 ++++++++-
 .../internal/DefaultResourceSpecification.java  |  25 --
 .../internal/DefaultTwillSpecification.java     |  86 +++-
 .../org/apache/twill/internal/Constants.java    |   6 +
 .../json/ResourceSpecificationCodec.java        |   6 +-
 .../json/TwillSpecificationAdapter.java         |   3 +
 .../internal/json/TwillSpecificationCodec.java  |  36 +-
 .../json/ResourceSpecificationCodecTest.java    |  19 +-
 .../internal/yarn/Hadoop20YarnAMClient.java     |  17 +-
 .../internal/yarn/Hadoop20YarnAppClient.java    |   7 +
 .../internal/yarn/Hadoop21YarnAMClient.java     |  28 +-
 .../internal/yarn/Hadoop21YarnAppClient.java    |   8 +
 .../internal/yarn/Hadoop22YarnAMClient.java     |  42 ++
 .../appmaster/AllocationSpecification.java      | 105 +++++
 .../appmaster/ApplicationMasterService.java     | 169 +++++++-
 .../appmaster/PlacementPolicyManager.java       | 104 +++++
 .../internal/appmaster/ProvisionRequest.java    |  11 +
 .../appmaster/RunnableContainerRequest.java     |  11 +-
 .../appmaster/RunnableProcessLauncher.java      |   5 +-
 .../internal/appmaster/RunningContainers.java   |  54 +++
 .../internal/yarn/AbstractYarnAMClient.java     |  70 +++-
 .../yarn/VersionDetectYarnAMClientFactory.java  |  25 +-
 .../yarn/VersionDetectYarnAppClientFactory.java |   2 +-
 .../twill/internal/yarn/YarnAMClient.java       |  15 +
 .../twill/internal/yarn/YarnAppClient.java      |  10 +
 .../apache/twill/internal/yarn/YarnUtils.java   |  38 +-
 .../apache/twill/yarn/YarnTwillPreparer.java    |   4 +-
 .../twill/yarn/PlacementPolicyTestRun.java      | 391 +++++++++++++++++++
 .../org/apache/twill/yarn/YarnTestSuite.java    |   3 +-
 .../org/apache/twill/yarn/YarnTestUtils.java    |  23 +-
 34 files changed, 1522 insertions(+), 166 deletions(-)
----------------------------------------------------------------------



[5/6] git commit: Make timeout for test cases longer to avoid failing too much in Travis when it get scheduled on relatively slow boxes.

Posted by ch...@apache.org.
Make timeout for test cases longer to avoid failing too much in Travis when it get scheduled on relatively slow boxes.

Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/7f3d7b8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/7f3d7b8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/7f3d7b8b

Branch: refs/heads/site
Commit: 7f3d7b8b77211819f2f91a561b1a62ce43d510c0
Parents: 7a72ce1
Author: Terence Yim <te...@continuuity.com>
Authored: Tue Sep 23 13:27:10 2014 -0700
Committer: Terence Yim <te...@continuuity.com>
Committed: Tue Sep 23 13:27:10 2014 -0700

----------------------------------------------------------------------
 .../src/test/java/org/apache/twill/yarn/DebugTestRun.java |  8 ++++----
 .../java/org/apache/twill/yarn/ResourceReportTestRun.java | 10 +++++-----
 .../twill/internal/zookeeper/LeaderElectionTest.java      |  6 +++---
 3 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7f3d7b8b/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java
index 0cb6fc8..e9f00ad 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java
@@ -118,9 +118,9 @@ public class DebugTestRun extends BaseYarnTest {
       }
     }, Threads.SAME_THREAD_EXECUTOR);
 
-    Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+    Assert.assertTrue(running.await(120, TimeUnit.SECONDS));
     Assert.assertTrue(waitForDebugPort(controller, "r1", 30));
-    controller.stop().get(30, TimeUnit.SECONDS);
+    controller.stop().get(120, TimeUnit.SECONDS);
     // Sleep a bit before exiting.
     TimeUnit.SECONDS.sleep(2);
   }
@@ -142,10 +142,10 @@ public class DebugTestRun extends BaseYarnTest {
       }
     }, Threads.SAME_THREAD_EXECUTOR);
 
-    Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+    Assert.assertTrue(running.await(120, TimeUnit.SECONDS));
     Assert.assertTrue(waitForDebugPort(controller, "r1", 30));
     Assert.assertTrue(waitForDebugPort(controller, "r2", 30));
-    controller.stop().get(30, TimeUnit.SECONDS);
+    controller.stop().get(120, TimeUnit.SECONDS);
     // Sleep a bit before exiting.
     TimeUnit.SECONDS.sleep(2);
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7f3d7b8b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
index 928a525..3a82272 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
@@ -103,10 +103,10 @@ public final class ResourceReportTestRun extends BaseYarnTest {
       }
     }, Threads.SAME_THREAD_EXECUTOR);
 
-    Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+    Assert.assertTrue(running.await(120, TimeUnit.SECONDS));
 
     Iterable<Discoverable> envEchoServices = controller.discoverService("envecho");
-    Assert.assertTrue(YarnTestUtils.waitForSize(envEchoServices, 1, 30));
+    Assert.assertTrue(YarnTestUtils.waitForSize(envEchoServices, 1, 120));
 
     // TODO: check virtual cores once yarn adds the ability
     Map<String, String> expectedValues = Maps.newHashMap();
@@ -128,7 +128,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
       }
     }
 
-    controller.stop().get(30, TimeUnit.SECONDS);
+    controller.stop().get(120, TimeUnit.SECONDS);
     // Sleep a bit before exiting.
     TimeUnit.SECONDS.sleep(2);
   }
@@ -215,7 +215,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
       }
     }, Threads.SAME_THREAD_EXECUTOR);
 
-    Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+    Assert.assertTrue(running.await(120, TimeUnit.SECONDS));
 
     // wait for 3 echo servers to come up
     Iterable<Discoverable> echoServices = controller.discoverService("echo");
@@ -280,7 +280,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
       Assert.assertEquals(512, resources.getMemoryMB());
     }
 
-    controller.stop().get(30, TimeUnit.SECONDS);
+    controller.stop().get(120, TimeUnit.SECONDS);
     // Sleep a bit before exiting.
     TimeUnit.SECONDS.sleep(2);
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7f3d7b8b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
index d58c11c..847b149 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
@@ -56,7 +56,7 @@ public class LeaderElectionTest {
 
   private static InMemoryZKServer zkServer;
 
-  @Test(timeout = 100000)
+  @Test(timeout = 150000)
   public void testElection() throws ExecutionException, InterruptedException, BrokenBarrierException {
     ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -129,7 +129,7 @@ public class LeaderElectionTest {
     }
   }
 
-  @Test(timeout = 100000)
+  @Test(timeout = 150000)
   public void testCancel() throws InterruptedException, IOException {
     List<LeaderElection> leaderElections = Lists.newArrayList();
     List<ZKClientService> zkClients = Lists.newArrayList();
@@ -213,7 +213,7 @@ public class LeaderElectionTest {
     }
   }
 
-  @Test(timeout = 100000)
+  @Test(timeout = 150000)
   public void testDisconnect() throws IOException, InterruptedException {
     File zkDataDir = tmpFolder.newFolder();
     InMemoryZKServer ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).build();