You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/09/23 05:23:45 UTC

[2/2] git commit: TWILL-87 : Adding Container Placement Policy control.

TWILL-87 : Adding Container Placement Policy control.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/8cf4cd02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/8cf4cd02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/8cf4cd02

Branch: refs/heads/master
Commit: 8cf4cd02c922595c703eff4076ca74633421b301
Parents: 34af22b
Author: Gourav Khaneja <go...@gmail.com>
Authored: Sat Aug 2 00:25:02 2014 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Sat Aug 2 23:04:56 2014 -0700

----------------------------------------------------------------------
 pom.xml                                         |   2 +
 .../main/java/org/apache/twill/api/Hosts.java   |  65 +++
 .../main/java/org/apache/twill/api/Racks.java   |  65 +++
 .../apache/twill/api/ResourceSpecification.java |  58 +--
 .../apache/twill/api/TwillSpecification.java    | 175 ++++++++-
 .../internal/DefaultResourceSpecification.java  |  25 --
 .../internal/DefaultTwillSpecification.java     |  86 +++-
 .../org/apache/twill/internal/Constants.java    |   6 +
 .../json/ResourceSpecificationCodec.java        |   6 +-
 .../json/TwillSpecificationAdapter.java         |   3 +
 .../internal/json/TwillSpecificationCodec.java  |  36 +-
 .../json/ResourceSpecificationCodecTest.java    |  19 +-
 .../internal/yarn/Hadoop20YarnAMClient.java     |  17 +-
 .../internal/yarn/Hadoop20YarnAppClient.java    |   7 +
 .../internal/yarn/Hadoop21YarnAMClient.java     |  28 +-
 .../internal/yarn/Hadoop21YarnAppClient.java    |   8 +
 .../internal/yarn/Hadoop22YarnAMClient.java     |  42 ++
 .../appmaster/AllocationSpecification.java      | 105 +++++
 .../appmaster/ApplicationMasterService.java     | 169 +++++++-
 .../appmaster/PlacementPolicyManager.java       | 104 +++++
 .../internal/appmaster/ProvisionRequest.java    |  11 +
 .../appmaster/RunnableContainerRequest.java     |  11 +-
 .../appmaster/RunnableProcessLauncher.java      |   5 +-
 .../internal/appmaster/RunningContainers.java   |  54 +++
 .../internal/yarn/AbstractYarnAMClient.java     |  70 +++-
 .../yarn/VersionDetectYarnAMClientFactory.java  |  25 +-
 .../yarn/VersionDetectYarnAppClientFactory.java |   2 +-
 .../twill/internal/yarn/YarnAMClient.java       |  15 +
 .../twill/internal/yarn/YarnAppClient.java      |  10 +
 .../apache/twill/internal/yarn/YarnUtils.java   |  38 +-
 .../apache/twill/yarn/YarnTwillPreparer.java    |   4 +-
 .../twill/yarn/PlacementPolicyTestRun.java      | 391 +++++++++++++++++++
 .../org/apache/twill/yarn/YarnTestSuite.java    |   3 +-
 .../org/apache/twill/yarn/YarnTestUtils.java    |  23 +-
 34 files changed, 1522 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dfb7394..67ab4fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -508,6 +508,7 @@
                                 <configuration>
                                     <sources>
                                         <source>src/main/hadoop21</source>
+                                        <source>src/main/hadoop22</source>
                                     </sources>
                                 </configuration>
                             </execution>
@@ -552,6 +553,7 @@
                                 <configuration>
                                     <sources>
                                         <source>src/main/hadoop21</source>
+                                        <source>src/main/hadoop22</source>
                                     </sources>
                                 </configuration>
                             </execution>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/Hosts.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Hosts.java b/twill-api/src/main/java/org/apache/twill/api/Hosts.java
new file mode 100644
index 0000000..90a27ad
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/Hosts.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.api;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Represents a list of hosts.
+ */
+
+public class Hosts {
+  private final Set<String> hosts;
+
+  public Hosts(Set<String> hosts) {
+    this.hosts = ImmutableSet.copyOf(hosts);
+  }
+
+  public Hosts(String host, String...moreHosts) {
+    this.hosts = ImmutableSet.<String>builder()
+      .add(host)
+      .addAll(Arrays.asList(moreHosts))
+      .build();
+  }
+
+  /**
+   * Convenience method to create an instance of {@link org.apache.twill.api.Hosts}.
+   * @param host A host to be added.
+   * @param moreHosts A list of hosts to be added.
+   * @return An instance of {@link org.apache.twill.api.Hosts} containing specified hosts.
+   */
+  public static Hosts of(String host, String...moreHosts) {
+    return new Hosts(host, moreHosts);
+  }
+
+  /**
+   * Get the list of hosts.
+   * @return list of hosts.
+   */
+  public Set<String> get() {
+    return this.hosts;
+  }
+
+  @Override
+  public String toString() {
+    return this.hosts.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/Racks.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Racks.java b/twill-api/src/main/java/org/apache/twill/api/Racks.java
new file mode 100644
index 0000000..04eedd0
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/Racks.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.api;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Represents a list of Racks.
+ */
+
+public class Racks {
+  private final Set<String> racks;
+
+  public Racks(Set<String> racks) {
+    this.racks = ImmutableSet.copyOf(racks);
+  }
+
+  public Racks(String rack, String...moreRacks) {
+    this.racks = ImmutableSet.<String>builder()
+      .add(rack)
+      .addAll(Arrays.asList(moreRacks))
+      .build();
+  }
+
+  /**
+   * Convenience method to create an instance of {@link org.apache.twill.api.Racks}.
+   * @param rack A rack to be added.
+   * @param moreRacks A list of racks to be added.
+   * @return An instance of {@link org.apache.twill.api.Racks} containing specified racks.
+   */
+  public static Racks of(String rack, String...moreRacks) {
+    return new Racks(rack, moreRacks);
+  }
+
+  /**
+   * Get the list of racks.
+   * @return list of racks.
+   */
+  public Set<String> get() {
+    return this.racks;
+  }
+
+  @Override
+  public String toString() {
+    return this.racks.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java b/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
index 4b602bc..2865caf 100644
--- a/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
@@ -17,14 +17,8 @@
  */
 package org.apache.twill.api;
 
-import com.google.common.collect.Iterables;
 import org.apache.twill.internal.DefaultResourceSpecification;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
 /**
  * This interface provides specifications for resource requirements including set and get methods
  * for number of cores, amount of memory, and number of instances.
@@ -85,20 +79,6 @@ public interface ResourceSpecification {
   int getInstances();
 
   /**
-   * Returns the execution hosts, expects Fully Qualified Domain Names host + domain.
-   * This is a suggestion for the scheduler depending on cluster load it may ignore it
-   * @return An array containing the hosts where the containers should run
-   */
-  List<String> getHosts();
-
-  /**
-   * Returns the execution racks.
-   * This is a suggestion for the scheduler depending on cluster load it may ignore it
-   * @return An array containing the racks where the containers should run
-   */
-  List<String> getRacks();
-
-  /**
    * Builder for creating {@link ResourceSpecification}.
    */
   static final class Builder {
@@ -108,8 +88,6 @@ public interface ResourceSpecification {
     private int uplink = -1;
     private int downlink = -1;
     private int instances = 1;
-    private List<String> hosts = new LinkedList<String>();
-    private List<String> racks = new LinkedList<String>();
 
     public static CoreSetter with() {
       return new Builder().new CoreSetter();
@@ -150,42 +128,10 @@ public interface ResourceSpecification {
     }
 
     public final class AfterUplink extends Build {
-      public AfterDownlink setDownlink(int downlink, SizeUnit unit) {
+      public Done setDownlink(int downlink, SizeUnit unit) {
         Builder.this.downlink = downlink * unit.multiplier;
-        return new AfterDownlink();
-      }
-    }
-
-    public final class AfterHosts extends Build {
-      public Done setRacks(String... racks) {
-        if (racks != null) {
-          Builder.this.racks = Arrays.asList(racks);
-        }
         return new Done();
       }
-
-      public Done setRacks(Iterable<String> racks) {
-        if (racks != null) {
-          Iterables.addAll(Builder.this.racks, racks);
-        }
-        return new Done();
-      }
-    }
-
-    public final class AfterDownlink extends Build {
-      public AfterHosts setHosts(String... hosts) {
-        if (hosts != null) {
-          Builder.this.hosts = Arrays.asList(hosts);
-        }
-        return new AfterHosts();
-      }
-
-      public AfterHosts setHosts(Iterable<String> hosts) {
-        if (hosts != null) {
-          Iterables.addAll(Builder.this.hosts, hosts);
-        }
-        return new AfterHosts();
-      }
     }
 
     public final class Done extends Build {
@@ -193,7 +139,7 @@ public interface ResourceSpecification {
 
     public abstract class Build {
       public ResourceSpecification build() {
-        return new DefaultResourceSpecification(cores, memory, instances, uplink, downlink, hosts, racks);
+        return new DefaultResourceSpecification(cores, memory, instances, uplink, downlink);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java b/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
index 3931d04..ed37190 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
@@ -60,6 +60,46 @@ public interface TwillSpecification {
   }
 
   /**
+   * Defines a container placement policy.
+   */
+  interface PlacementPolicy {
+
+    /**
+     * Lists different types of Placement Policies available.
+     */
+    enum Type {
+      /**
+       * Runnables should be scattered over different hosts.
+       */
+      DISTRIBUTED,
+      /**
+       * No specific placement policy.
+       */
+      DEFAULT
+    }
+
+    /**
+     * @return Set of {@link org.apache.twill.api.TwillRunnable} names that belongs to this placement policy.
+     */
+    Set<String> getNames();
+
+    /**
+     * @return {@link org.apache.twill.api.TwillSpecification.PlacementPolicy.Type Type} of this placement policy.
+     */
+    Type getType();
+
+    /**
+     * @return Set of hosts associated with this placement policy.
+     */
+    Set<String> getHosts();
+
+    /**
+     * @return Set of racks associated with this placement policy.
+     */
+    Set<String> getRacks();
+  }
+
+  /**
    * @return Name of the application.
    */
   String getName();
@@ -75,6 +115,11 @@ public interface TwillSpecification {
   List<Order> getOrders();
 
   /**
+   * @return Returns all {@link org.apache.twill.api.TwillSpecification.PlacementPolicy} for this application.
+   */
+  List<PlacementPolicy> getPlacementPolicies();
+
+  /**
    * @return The {@link EventHandlerSpecification} for the {@link EventHandler} to be used for this application,
    *         or {@code null} if no event handler has been provided.
    */
@@ -89,6 +134,7 @@ public interface TwillSpecification {
     private String name;
     private Map<String, RuntimeSpecification> runnables = Maps.newHashMap();
     private List<Order> orders = Lists.newArrayList();
+    private List<PlacementPolicy> placementPolicies = Lists.newArrayList();
     private EventHandlerSpecification eventHandler;
 
     public static NameSetter with() {
@@ -128,6 +174,8 @@ public interface TwillSpecification {
       FirstOrder withOrder();
 
       AfterOrder anyOrder();
+
+      PlacementPolicySetter withPlacementPolicy();
     }
 
     public final class RunnableSetter implements MoreRunnable, AfterRunnable {
@@ -170,6 +218,11 @@ public interface TwillSpecification {
       public AfterOrder anyOrder() {
         return new OrderSetter();
       }
+
+      @Override
+      public PlacementPolicySetter withPlacementPolicy() {
+        return new PlacementPolicySetter();
+      }
     }
 
     /**
@@ -252,6 +305,125 @@ public interface TwillSpecification {
       }
     }
 
+    /**
+     * Interface to add placement policies to the application.
+     */
+    public interface MorePlacementPolicies {
+
+      /**
+       * Specify hosts for a list of runnables.
+       * @param hosts {@link org.apache.twill.api.Hosts} specifying a set of hosts.
+       * @param runnableName a runnable name.
+       * @param runnableNames a list of runnable names.
+       * @return A reference to either add more placement policies or skip to defining execution order.
+       */
+      PlacementPolicySetter add(Hosts hosts, String runnableName, String... runnableNames);
+
+      /**
+       * Specify racks for a list of runnables.
+       * @param racks {@link org.apache.twill.api.Racks} specifying a set of racks.
+       * @param runnableName a runnable name.
+       * @param runnableNames a list of runnable names.
+       * @return A reference to either add more placement policies or skip to defining execution order.
+       */
+      PlacementPolicySetter add(Racks racks, String runnableName, String... runnableNames);
+
+      /**
+       * Specify hosts and racks for a list of runnables.
+       * @param hosts {@link org.apache.twill.api.Hosts} specifying a set of hosts.
+       * @param racks {@link org.apache.twill.api.Racks} specifying a set of racks.
+       * @param runnableName a runnable name.
+       * @param runnableNames a list of runnable names.
+       * @return A reference to either add more placement policies or skip to defining execution order.
+       */
+      PlacementPolicySetter add(Hosts hosts, Racks racks, String runnableName, String... runnableNames);
+
+      /**
+       * Specify a placement policy for a list of runnables.
+       * @param type {@link PlacementPolicy.Type} specifying a specific placement policy type.
+       * @param runnableName a runnable name.
+       * @param runnableNames a list of runnable names.
+       * @return A reference to either add more placement policies or skip to defining execution order.
+       */
+      PlacementPolicySetter add(PlacementPolicy.Type type, String runnableName, String... runnableNames);
+    }
+
+    /**
+     * Interface to define execution order after adding placement policies.
+     */
+    public interface AfterPlacementPolicy {
+      /**
+       * Start defining execution order.
+       */
+      FirstOrder withOrder();
+
+      /**
+       * No particular execution order is needed.
+       */
+      AfterOrder anyOrder();
+    }
+
+    public final class PlacementPolicySetter implements MorePlacementPolicies, AfterPlacementPolicy {
+
+      @Override
+      public PlacementPolicySetter add(Hosts hosts, String runnableName, String... runnableNames) {
+        return addPlacementPolicy(PlacementPolicy.Type.DEFAULT, hosts, null, runnableName, runnableNames);
+      }
+
+      @Override
+      public PlacementPolicySetter add(Racks racks, String runnableName, String... runnableNames) {
+        return addPlacementPolicy(PlacementPolicy.Type.DEFAULT, null, racks, runnableName, runnableNames);
+      }
+
+      @Override
+      public PlacementPolicySetter add(Hosts hosts, Racks racks, String runnableName, String... runnableNames) {
+        return addPlacementPolicy(PlacementPolicy.Type.DEFAULT, hosts, racks, runnableName, runnableNames);
+      }
+
+      @Override
+      public PlacementPolicySetter add(PlacementPolicy.Type type, String runnableName, String...runnableNames) {
+        return addPlacementPolicy(type, null, null, runnableName, runnableNames);
+      }
+
+      private PlacementPolicySetter addPlacementPolicy(PlacementPolicy.Type type, Hosts hosts, Racks racks,
+                                                       String runnableName, String...runnableNames) {
+        Preconditions.checkArgument(runnableName != null, "Name cannot be null.");
+        Preconditions.checkArgument(runnables.containsKey(runnableName), "Runnable not exists.");
+        Preconditions.checkArgument(!contains(runnableName),
+                                    "Runnable (" + runnableName + ") cannot belong to more than one Placement Policy");
+        Set<String> runnableNamesSet = Sets.newHashSet(runnableName);
+        for (String name : runnableNames) {
+          Preconditions.checkArgument(name != null, "Name cannot be null.");
+          Preconditions.checkArgument(runnables.containsKey(name), "Runnable not exists.");
+          Preconditions.checkArgument(!contains(name),
+                                      "Runnable (" + name + ") cannot belong to more than one Placement Policy");
+          runnableNamesSet.add(name);
+        }
+        placementPolicies.add(
+          new DefaultTwillSpecification.DefaultPlacementPolicy(runnableNamesSet, type, hosts, racks));
+        return this;
+      }
+
+      private boolean contains(String runnableName) {
+        for (PlacementPolicy placementPolicy : placementPolicies) {
+          if (placementPolicy.getNames().contains(runnableName)) {
+            return true;
+          }
+        }
+        return false;
+      }
+
+      @Override
+      public FirstOrder withOrder() {
+        return new OrderSetter();
+      }
+
+      @Override
+      public AfterOrder anyOrder() {
+        return new OrderSetter();
+      }
+    }
+
     public interface FirstOrder {
       NextOrder begin(String name, String...names);
     }
@@ -303,8 +475,7 @@ public interface TwillSpecification {
 
         // For all unordered runnables, add it to the end of orders list
         orders.add(new DefaultTwillSpecification.DefaultOrder(runnableNames, Order.Type.STARTED));
-
-        return new DefaultTwillSpecification(name, runnables, orders, eventHandler);
+        return new DefaultTwillSpecification(name, runnables, orders, placementPolicies, eventHandler);
       }
 
       private void addOrder(final Order.Type type, String name, String...names) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
index 2998165..1327ce5 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
@@ -17,12 +17,8 @@
  */
 package org.apache.twill.internal;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.twill.api.ResourceSpecification;
 
-import java.util.Collections;
-import java.util.List;
-
 /**
  * Straightforward implementation of {@link org.apache.twill.api.ResourceSpecification}.
  */
@@ -32,24 +28,13 @@ public final class DefaultResourceSpecification implements ResourceSpecification
   private final int instances;
   private final int uplink;
   private final int downlink;
-  private final List<String> hosts;
-  private final List<String> racks;
 
   public DefaultResourceSpecification(int virtualCores, int memorySize, int instances, int uplink, int downlink) {
-    this(virtualCores, memorySize, instances, uplink, downlink,
-            Collections.<String>emptyList(), Collections.<String>emptyList());
-  }
-
-  public DefaultResourceSpecification(int virtualCores, int memorySize, int instances,
-                                      int uplink, int downlink,
-                                      List<String> hosts, List<String> racks) {
     this.virtualCores = virtualCores;
     this.memorySize = memorySize;
     this.instances = instances;
     this.uplink = uplink;
     this.downlink = downlink;
-    this.hosts = ImmutableList.copyOf(hosts);
-    this.racks = ImmutableList.copyOf(racks);
   }
 
   @Deprecated
@@ -74,16 +59,6 @@ public final class DefaultResourceSpecification implements ResourceSpecification
   }
 
   @Override
-  public List<String> getHosts() {
-    return this.hosts;
-  }
-
-  @Override
-  public List<String> getRacks() {
-    return this.racks;
-  }
-
-  @Override
   public int getUplink() {
     return uplink;
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
index fdb8b32..184afb3 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
@@ -22,9 +22,12 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.Hosts;
+import org.apache.twill.api.Racks;
 import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.api.TwillSpecification;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,13 +41,16 @@ public final class DefaultTwillSpecification implements TwillSpecification {
   private final String name;
   private final Map<String, RuntimeSpecification> runnables;
   private final List<Order> orders;
+  private final List<PlacementPolicy> placementPolicies;
   private final EventHandlerSpecification eventHandler;
 
   public DefaultTwillSpecification(String name, Map<String, RuntimeSpecification> runnables,
-                                   List<Order> orders, EventHandlerSpecification eventHandler) {
+                                   List<Order> orders, List<PlacementPolicy> placementPolicies,
+                                   EventHandlerSpecification eventHandler) {
     this.name = name;
     this.runnables = ImmutableMap.copyOf(runnables);
     this.orders = ImmutableList.copyOf(orders);
+    this.placementPolicies = placementPolicies;
     this.eventHandler = eventHandler;
   }
 
@@ -63,6 +69,11 @@ public final class DefaultTwillSpecification implements TwillSpecification {
     return orders;
   }
 
+  @Override
+  public List<PlacementPolicy> getPlacementPolicies() {
+    return placementPolicies;
+  }
+
   @Nullable
   @Override
   public EventHandlerSpecification getEventHandler() {
@@ -100,4 +111,77 @@ public final class DefaultTwillSpecification implements TwillSpecification {
         .toString();
     }
   }
+
+  /**
+   * Straightforward implementation of {@link org.apache.twill.api.TwillSpecification.PlacementPolicy}.
+   */
+  public static final class DefaultPlacementPolicy implements PlacementPolicy {
+
+    private final Set<String> names;
+    private final Type type;
+    private final Hosts hosts;
+    private final Racks racks;
+
+    public DefaultPlacementPolicy(Iterable<String> names, Type type, Hosts hosts, Racks racks) {
+      this.names = ImmutableSet.copyOf(names);
+      this.type = type;
+      this.hosts = hosts;
+      this.racks = racks;
+    }
+
+    public DefaultPlacementPolicy(Iterable<String> names, Type type) {
+      this(names, type, null, null);
+    }
+
+    /**
+     * @return Set of {@link org.apache.twill.api.TwillRunnable} names that belongs to this placement policy.
+     */
+    @Override
+    public Set<String> getNames() {
+      return names;
+    }
+
+    /**
+     * @return {@link org.apache.twill.api.TwillSpecification.PlacementPolicy.Type Type} of this placement policy.
+     */
+    @Override
+    public Type getType() {
+      return type;
+    }
+
+    /**
+     * @return Set of hosts associated with this placement policy.
+     */
+    @Override
+    public Set<String> getHosts() {
+      if (this.hosts == null) {
+        return Collections.emptySet();
+      }
+      return this.hosts.get();
+    }
+
+    /**
+     * @return Set of racks associated with this placement policy.
+     */
+    @Override
+    public Set<String> getRacks() {
+      if (this.racks == null) {
+        return Collections.emptySet();
+      }
+      return this.racks.get();
+    }
+
+    /**
+     * @return String representation of Placement Policy
+     */
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+        .add("names", names)
+        .add("type", type)
+        .add("hosts", hosts)
+        .add("racks", racks)
+        .toString();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index 9912638..fbc6e70 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -31,6 +31,12 @@ public final class Constants {
 
   public static final long PROVISION_TIMEOUT = 30000;
 
+  /**
+   * Milliseconds AM should wait for RM to allocate a constrained provision request.
+   * On timeout, AM relaxes the request constraints.
+   */
+  public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
+
   /** Memory size of AM. */
   public static final int APP_MASTER_MEMORY_MB = 512;
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
index 5f7d7ae..412e406 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
@@ -44,8 +44,6 @@ final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecifi
     json.addProperty("instances", src.getInstances());
     json.addProperty("uplink", src.getUplink());
     json.addProperty("downlink", src.getDownlink());
-    json.add("hosts", context.serialize(src.getHosts()));
-    json.add("racks", context.serialize(src.getRacks()));
     return json;
   }
 
@@ -59,9 +57,7 @@ final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecifi
                                             jsonObj.get("memorySize").getAsInt(),
                                             jsonObj.get("instances").getAsInt(),
                                             jsonObj.get("uplink").getAsInt(),
-                                            jsonObj.get("downlink").getAsInt(),
-                                            hosts,
-                                            racks);
+                                            jsonObj.get("downlink").getAsInt());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
index 67c15a2..d882096 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
@@ -36,6 +36,7 @@ import org.apache.twill.api.TwillRunnableSpecification;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.internal.json.TwillSpecificationCodec.EventHandlerSpecificationCoder;
 import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationOrderCoder;
+import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationPlacementPolicyCoder;
 
 import java.io.File;
 import java.io.IOException;
@@ -61,6 +62,8 @@ public final class TwillSpecificationAdapter {
               .serializeNulls()
               .registerTypeAdapter(TwillSpecification.class, new TwillSpecificationCodec())
               .registerTypeAdapter(TwillSpecification.Order.class, new TwillSpecificationOrderCoder())
+              .registerTypeAdapter(TwillSpecification.PlacementPolicy.class,
+                                   new TwillSpecificationPlacementPolicyCoder())
               .registerTypeAdapter(EventHandlerSpecification.class, new EventHandlerSpecificationCoder())
               .registerTypeAdapter(RuntimeSpecification.class, new RuntimeSpecificationCodec())
               .registerTypeAdapter(TwillRunnableSpecification.class, new TwillRunnableSpecificationCodec())

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
index d46434a..96df950 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
@@ -26,6 +26,8 @@ import com.google.gson.JsonParseException;
 import com.google.gson.JsonSerializationContext;
 import com.google.gson.JsonSerializer;
 import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.Hosts;
+import org.apache.twill.api.Racks;
 import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.internal.DefaultEventHandlerSpecification;
@@ -50,6 +52,8 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
                                             new TypeToken<Map<String, RuntimeSpecification>>() { }.getType()));
     json.add("orders", context.serialize(src.getOrders(),
                                          new TypeToken<List<TwillSpecification.Order>>() { }.getType()));
+    json.add("placementPolicies", context.serialize(
+      src.getPlacementPolicies(), new TypeToken<List<TwillSpecification.PlacementPolicy>>() { }.getType()));
     EventHandlerSpecification eventHandler = src.getEventHandler();
     if (eventHandler != null) {
       json.add("handler", context.serialize(eventHandler, EventHandlerSpecification.class));
@@ -68,6 +72,8 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
       jsonObj.get("runnables"), new TypeToken<Map<String, RuntimeSpecification>>() { }.getType());
     List<TwillSpecification.Order> orders = context.deserialize(
       jsonObj.get("orders"), new TypeToken<List<TwillSpecification.Order>>() { }.getType());
+    List<TwillSpecification.PlacementPolicy> placementPolicies = context.deserialize(
+      jsonObj.get("placementPolicies"), new TypeToken<List<TwillSpecification.PlacementPolicy>>() { }.getType());
 
     JsonElement handler = jsonObj.get("handler");
     EventHandlerSpecification eventHandler = null;
@@ -75,7 +81,7 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
       eventHandler = context.deserialize(handler, EventHandlerSpecification.class);
     }
 
-    return new DefaultTwillSpecification(name, runnables, orders, eventHandler);
+    return new DefaultTwillSpecification(name, runnables, orders, placementPolicies, eventHandler);
   }
 
   static final class TwillSpecificationOrderCoder implements JsonSerializer<TwillSpecification.Order>,
@@ -101,6 +107,34 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
     }
   }
 
+  static final class TwillSpecificationPlacementPolicyCoder implements
+    JsonSerializer<TwillSpecification.PlacementPolicy>, JsonDeserializer<TwillSpecification.PlacementPolicy> {
+
+    @Override
+    public JsonElement serialize(TwillSpecification.PlacementPolicy src, Type typeOfSrc,
+                                 JsonSerializationContext context) {
+      JsonObject json = new JsonObject();
+      json.add("names", context.serialize(src.getNames(), new TypeToken<Set<String>>() { }.getType()));
+      json.addProperty("type", src.getType().name());
+      json.add("hosts", context.serialize(src.getHosts(), new TypeToken<Set<String>>() { }.getType()));
+      json.add("racks", context.serialize(src.getRacks(), new TypeToken<Set<String>>() { }.getType()));
+      return json;
+    }
+
+    @Override
+    public TwillSpecification.PlacementPolicy deserialize(JsonElement json, Type typeOfT,
+                                                          JsonDeserializationContext context)
+      throws JsonParseException {
+      JsonObject jsonObj = json.getAsJsonObject();
+      Set<String> names = context.deserialize(jsonObj.get("names"), new TypeToken<Set<String>>() { }.getType());
+      TwillSpecification.PlacementPolicy.Type type =
+        TwillSpecification.PlacementPolicy.Type.valueOf(jsonObj.get("type").getAsString());
+      Set<String> hosts = context.deserialize(jsonObj.get("hosts"), new TypeToken<Set<String>>() { }.getType());
+      Set<String> racks = context.deserialize(jsonObj.get("racks"), new TypeToken<Set<String>>() { }.getType());
+      return new DefaultTwillSpecification.DefaultPlacementPolicy(names, type, new Hosts(hosts), new Racks(racks));
+    }
+  }
+
   static final class EventHandlerSpecificationCoder implements JsonSerializer<EventHandlerSpecification>,
                                                                JsonDeserializer<EventHandlerSpecification> {
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java b/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
index e8b0eff..2e949d4 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
@@ -26,8 +26,6 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.unitils.reflectionassert.ReflectionAssert;
 
-import java.util.Arrays;
-
 /**
  * Maybe this checkstyle rule needs to be removed.
  */
@@ -45,13 +43,10 @@ public class ResourceSpecificationCodecTest {
                     "\"memorySize\":1024," +
                     "\"instances\":2," +
                     "\"uplink\":100," +
-                    "\"downlink\":100," +
-                    "\"hosts\":[\"one1\",\"two2\"]," +
-                    "\"racks\":[\"three3\"]" +
+                    "\"downlink\":100" +
             "}";
     final ResourceSpecification expected =
-            new DefaultResourceSpecification(2, 1024, 2, 100, 100,
-                    Arrays.asList("one1", "two2"), Arrays.asList("three3"));
+            new DefaultResourceSpecification(2, 1024, 2, 100, 100);
     final String actualString = gson.toJson(expected);
     Assert.assertEquals(expectedString, actualString);
 
@@ -71,12 +66,9 @@ public class ResourceSpecificationCodecTest {
             .setInstances(3)
             .setUplink(10, ResourceSpecification.SizeUnit.GIGA)
             .setDownlink(5, ResourceSpecification.SizeUnit.GIGA)
-            .setHosts("a1", "b2", "c3")
-            .setRacks("r2")
             .build();
     final DefaultResourceSpecification expected =
-            new DefaultResourceSpecification(5, 4096, 3, 10240, 5120,
-                    Arrays.asList("a1", "b2", "c3"), Arrays.asList("r2"));
+            new DefaultResourceSpecification(5, 4096, 3, 10240, 5120);
     ReflectionAssert.assertLenientEquals(expected, actual);
   }
 
@@ -88,12 +80,9 @@ public class ResourceSpecificationCodecTest {
             .setInstances(3)
             .setUplink(10, ResourceSpecification.SizeUnit.GIGA)
             .setDownlink(5, ResourceSpecification.SizeUnit.GIGA)
-            .setHosts(Arrays.asList("a1", "b2", "c3"))
-            .setRacks(Arrays.asList("r2"))
             .build();
     final DefaultResourceSpecification expected =
-            new DefaultResourceSpecification(5, 4096, 3, 10240, 5120,
-                    Arrays.asList("a1", "b2", "c3"), Arrays.asList("r2"));
+            new DefaultResourceSpecification(5, 4096, 3, 10240, 5120);
     ReflectionAssert.assertLenientEquals(expected, actual);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
index 9b66f67..a990cc0 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -100,6 +100,11 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
   }
 
   @Override
+  public int getNMPort() {
+    return Integer.parseInt(System.getenv().get(ApplicationConstants.NM_PORT_ENV));
+  }
+
+  @Override
   protected Resource adjustCapability(Resource resource) {
     int cores = YarnUtils.getVirtualCores(resource);
     int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
@@ -123,7 +128,9 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
 
   @Override
   protected AMRMClient.ContainerRequest createContainerRequest(Priority priority, Resource capability,
-                                                               @Nullable String[] hosts, @Nullable String[] racks) {
+                                                               @Nullable String[] hosts, @Nullable String[] racks,
+                                                               boolean relaxLocality) {
+    // Ignore relaxLocality param since the corresponding support is not present in Hadoop 2.0.
     return new AMRMClient.ContainerRequest(capability, hosts, racks, priority, 1);
   }
 
@@ -138,6 +145,14 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
   }
 
   @Override
+  protected void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+    // An empty implementation since Blacklist is not supported in Hadoop-2.0.
+    if (recordUnsupportedFeature("blacklist")) {
+      LOG.warn("Blacklist is not supported in Hadoop 2.0");
+    }
+  }
+
+  @Override
   protected AllocateResult doAllocate(float progress) throws Exception {
     AllocationResponse response = amrmClient.allocate(progress);
     List<RunnableProcessLauncher> launchers

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
index 6fdd99e..3f7422d 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.YarnClient;
 import org.apache.hadoop.yarn.client.YarnClientImpl;
@@ -45,6 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.List;
 
 /**
  *
@@ -156,6 +158,11 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements
   }
 
   @Override
+  public List<NodeReport> getNodeReports() throws Exception {
+    return this.yarnClient.getNodeReports();
+  }
+
+  @Override
   protected void startUp() throws Exception {
     yarnClient.start();
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
index 78a5135..c5ac458 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -42,11 +42,11 @@ import javax.annotation.Nullable;
 /**
  *
  */
-public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.ContainerRequest> {
+public class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.ContainerRequest> {
 
   private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAMClient.class);
 
-  private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+  protected static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
 
   static {
     STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
@@ -57,9 +57,9 @@ public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.
     };
   }
 
-  private final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
-  private final Hadoop21YarnNMClient nmClient;
-  private Resource maxCapability;
+  protected final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
+  protected final Hadoop21YarnNMClient nmClient;
+  protected Resource maxCapability;
 
   public Hadoop21YarnAMClient(Configuration conf) {
     super(ApplicationConstants.Environment.CONTAINER_ID.name());
@@ -95,9 +95,15 @@ public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.
   }
 
   @Override
+  public int getNMPort() {
+    return Integer.parseInt(System.getenv().get(ApplicationConstants.Environment.NM_PORT.name()));
+  }
+
+  @Override
   protected AMRMClient.ContainerRequest createContainerRequest(Priority priority, Resource capability,
-                                                               @Nullable String[] hosts, @Nullable String[] racks) {
-    return new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
+                                                               @Nullable String[] hosts, @Nullable String[] racks,
+                                                               boolean relaxLocality) {
+    return new AMRMClient.ContainerRequest(capability, hosts, racks, priority, relaxLocality);
   }
 
   @Override
@@ -111,6 +117,14 @@ public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.
   }
 
   @Override
+  protected void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+    // An empty implementation since Blacklist is not supported in Hadoop-2.1 AMRMClient.
+    if (recordUnsupportedFeature("blacklist")) {
+      LOG.warn("Blacklist is not supported in Hadoop 2.1 AMRMClient");
+    }
+  }
+
+  @Override
   protected AllocateResult doAllocate(float progress) throws Exception {
     AllocateResponse allocateResponse = amrmClient.allocate(progress);
     List<RunnableProcessLauncher> launchers

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
index 20558bb..9b70a86 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@@ -41,6 +42,8 @@ import org.apache.twill.internal.appmaster.ApplicationSubmitter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 /**
  *
  */
@@ -136,6 +139,11 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements
   }
 
   @Override
+  public List<NodeReport> getNodeReports() throws Exception {
+    return this.yarnClient.getNodeReports();
+  }
+
+  @Override
   protected void startUp() throws Exception {
     yarnClient.start();
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java b/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
new file mode 100644
index 0000000..3ee99e8
--- /dev/null
+++ b/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Wrapper class for AMRMClient for Hadoop version 2.2 or greater.
+ */
+public final class Hadoop22YarnAMClient extends Hadoop21YarnAMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop22YarnAMClient.class);
+
+  public Hadoop22YarnAMClient(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  protected void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+    LOG.debug("Blacklist Additions: {} , Blacklist Removals: {}", blacklistAdditions, blacklistRemovals);
+    amrmClient.updateBlacklist(blacklistAdditions, blacklistRemovals);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
new file mode 100644
index 0000000..38cb32c
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * This class defines how the containers should be allocated.
+ */
+public class AllocationSpecification {
+
+  /**
+   * Defines different types of allocation strategies.
+   */
+  enum Type {
+    ALLOCATE_ONE_INSTANCE_AT_A_TIME,
+    DEFAULT
+  }
+
+  /**
+   * Resource specification of runnables.
+   */
+  private Resource resource;
+
+  /**
+   * Allocation strategy Type.
+   */
+  private Type type;
+
+  /**
+   * Name of runnable. Set to null if the class represents more than one runnable.
+   */
+  private String runnableName;
+
+  /**
+   * Instance number for the runnable. Set to  0 if the class represents more than one instance / runnable.
+   */
+  private int instanceId;
+
+  public AllocationSpecification(Resource resource) {
+    this(resource, Type.DEFAULT, null, 0);
+  }
+
+  public AllocationSpecification(Resource resource, Type type, String runnableName, int instanceId) {
+    this.resource = resource;
+    this.type = type;
+    this.runnableName = runnableName;
+    this.instanceId = instanceId;
+  }
+
+  public Resource getResource() {
+    return this.resource;
+  }
+
+  public Type getType() {
+    return this.type;
+  }
+
+  public String getRunnableName() {
+    return this.runnableName;
+  }
+
+  public int getInstanceId() {
+    return this.instanceId;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof AllocationSpecification)) {
+      return false;
+    }
+    AllocationSpecification other = (AllocationSpecification) obj;
+    return (instanceId == other.instanceId) &&
+      Objects.equal(resource, other.resource) &&
+      Objects.equal(type, other.type) &&
+      Objects.equal(runnableName, other.runnableName);
+  }
+
+  @Override
+  public int hashCode() {
+    return this.resource.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index c25a002..b4f9997 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -25,7 +25,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -64,6 +63,7 @@ import org.apache.twill.filesystem.Location;
 import org.apache.twill.internal.AbstractTwillService;
 import org.apache.twill.internal.Configs;
 import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ContainerInfo;
 import org.apache.twill.internal.DefaultTwillRunResources;
 import org.apache.twill.internal.EnvKeys;
 import org.apache.twill.internal.JvmOptions;
@@ -129,6 +129,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
   private final int reservedMemory;
   private final EventHandler eventHandler;
   private final Location applicationLocation;
+  private final PlacementPolicyManager placementPolicyManager;
 
   private EmbeddedKafkaServer kafkaServer;
   private Queue<RunnableContainerRequest> runnableContainerRequests;
@@ -146,6 +147,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
     this.credentials = createCredentials();
     this.jvmOpts = loadJvmOptions();
     this.reservedMemory = getReservedMemory();
+    this.placementPolicyManager = new PlacementPolicyManager(twillSpec.getPlacementPolicies());
 
     amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
                                                    Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
@@ -348,7 +350,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
 
   private void doRun() throws Exception {
     // The main loop
-    Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> currentRequest = null;
+    Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> currentRequest = null;
     final Queue<ProvisionRequest> provisioning = Lists.newLinkedList();
 
     YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() {
@@ -363,6 +365,8 @@ public final class ApplicationMasterService extends AbstractTwillService {
       }
     };
 
+    long requestStartTime = 0;
+    boolean isRequestRelaxed = false;
     long nextTimeoutCheck = System.currentTimeMillis() + Constants.PROVISION_TIMEOUT;
     while (isRunning()) {
       // Call allocate. It has to be made at first in order to be able to get cluster resource availability.
@@ -383,10 +387,25 @@ public final class ApplicationMasterService extends AbstractTwillService {
           runnableContainerRequests.poll();
         }
       }
+
       // Nothing in provision, makes the next batch of provision request
       if (provisioning.isEmpty() && currentRequest != null) {
-        addContainerRequests(currentRequest.getKey(), currentRequest.getValue(), provisioning);
+        manageBlacklist(currentRequest);
+        addContainerRequests(currentRequest.getKey().getResource(), currentRequest.getValue(), provisioning,
+                             currentRequest.getKey().getType());
         currentRequest = null;
+        requestStartTime = System.currentTimeMillis();
+        isRequestRelaxed = false;
+      }
+
+      // Check for provision request timeout i.e. check if any provision request has been pending
+      // for more than the designated time. On timeout, relax the request constraints.
+      if (!provisioning.isEmpty() && !isRequestRelaxed &&
+        (System.currentTimeMillis() - requestStartTime) > Constants.CONSTRAINED_PROVISION_REQUEST_TIMEOUT) {
+        LOG.info("Relaxing provisioning constraints for request {}", provisioning.peek().getRequestId());
+        // Clear the blacklist for the pending provision request(s).
+        clearBlacklist();
+        isRequestRelaxed = true;
       }
 
       nextTimeoutCheck = checkProvisionTimeout(nextTimeoutCheck);
@@ -398,6 +417,37 @@ public final class ApplicationMasterService extends AbstractTwillService {
   }
 
   /**
+   * Manage Blacklist for a given request.
+   */
+  private void manageBlacklist(Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> request) {
+    clearBlacklist();
+
+    //Check the allocation strategy
+    AllocationSpecification currentAllocationSpecification = request.getKey();
+    if (currentAllocationSpecification.getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
+
+      //Check the placement policy
+      TwillSpecification.PlacementPolicy placementPolicy =
+        placementPolicyManager.getPlacementPolicy(currentAllocationSpecification.getRunnableName());
+      if (placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+
+        //Update blacklist with hosts which are running DISTRIBUTED runnables
+        for (String runnable : placementPolicyManager.getFellowRunnables(request.getKey().getRunnableName())) {
+          Collection<ContainerInfo> containerStats =
+            runningContainers.getContainerInfo(runnable);
+          for (ContainerInfo containerInfo : containerStats) {
+            // Yarn Resource Manager may include port in the node name depending on the setting
+            // YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME. It is safe to add both
+            // the names (with and without port) to the blacklist.
+            addToBlacklist(containerInfo.getHost().getHostName());
+            addToBlacklist(containerInfo.getHost().getHostName() + ":" + containerInfo.getPort());
+          }
+        }
+      }
+    }
+  }
+
+  /**
    * Handling containers that are completed.
    */
   private void handleCompleted(List<YarnContainerStatus> completedContainersStatuses) {
@@ -433,13 +483,16 @@ public final class ApplicationMasterService extends AbstractTwillService {
     // Invoke event handler for provision request timeout
     Map<String, ExpectedContainers.ExpectedCount> expiredRequests = expectedContainers.getAll();
     Map<String, Integer> runningCounts = runningContainers.countAll();
+    Map<String, Integer> completedContainerCount = runningContainers.getCompletedContainerCount();
 
     List<EventHandler.TimeoutEvent> timeoutEvents = Lists.newArrayList();
     for (Map.Entry<String, ExpectedContainers.ExpectedCount> entry : expiredRequests.entrySet()) {
       String runnableName = entry.getKey();
       ExpectedContainers.ExpectedCount expectedCount = entry.getValue();
       int runningCount = runningCounts.containsKey(runnableName) ? runningCounts.get(runnableName) : 0;
-      if (expectedCount.getCount() != runningCount) {
+      int completedCount = completedContainerCount.containsKey(runnableName) ?
+        completedContainerCount.get(runnableName) : 0;
+      if (expectedCount.getCount() > runningCount + completedCount) {
         timeoutEvents.add(new EventHandler.TimeoutEvent(runnableName, expectedCount.getCount(),
                                                                    runningCount, expectedCount.getTimestamp()));
       }
@@ -488,42 +541,101 @@ public final class ApplicationMasterService extends AbstractTwillService {
   private Queue<RunnableContainerRequest> initContainerRequests() {
     // Orderly stores container requests.
     Queue<RunnableContainerRequest> requests = Lists.newLinkedList();
-    // For each order in the twillSpec, create container request for each runnable.
+    // For each order in the twillSpec, create container request for runnables, depending on Placement policy.
     for (TwillSpecification.Order order : twillSpec.getOrders()) {
-      // Group container requests based on resource requirement.
-      ImmutableMultimap.Builder<Resource, RuntimeSpecification> builder = ImmutableMultimap.builder();
-      for (String runnableName : order.getNames()) {
+      Set<String> distributedRunnables = placementPolicyManager.getDistributedRunnables(order.getNames());
+      Set<String> defaultRunnables = Sets.newHashSet();
+      defaultRunnables.addAll(order.getNames());
+      defaultRunnables.removeAll(distributedRunnables);
+      Map<AllocationSpecification, Collection<RuntimeSpecification>> requestsMap = Maps.newHashMap();
+      for (String runnableName : distributedRunnables) {
         RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
         Resource capability = createCapability(runtimeSpec.getResourceSpecification());
-        builder.put(capability, runtimeSpec);
+        for (int instanceId = 0; instanceId < runtimeSpec.getResourceSpecification().getInstances(); instanceId++) {
+          AllocationSpecification allocationSpecification =
+            new AllocationSpecification(capability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME,
+                                        runnableName, instanceId);
+          addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+        }
       }
-      requests.add(new RunnableContainerRequest(order.getType(), builder.build()));
+      for (String runnableName : defaultRunnables) {
+        RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
+        Resource capability = createCapability(runtimeSpec.getResourceSpecification());
+        AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
+        addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+      }
+      requests.add(new RunnableContainerRequest(order.getType(), requestsMap));
     }
     return requests;
   }
 
   /**
+   * Helper method to create {@link org.apache.twill.internal.appmaster.RunnableContainerRequest}.
+   */
+  private void addAllocationSpecification(AllocationSpecification allocationSpecification,
+                                          Map<AllocationSpecification, Collection<RuntimeSpecification>> map,
+                                          RuntimeSpecification runtimeSpec) {
+    if (!map.containsKey(allocationSpecification)) {
+      map.put(allocationSpecification, Lists.<RuntimeSpecification>newLinkedList());
+    }
+    map.get(allocationSpecification).add(runtimeSpec);
+  }
+
+  /**
    * Adds container requests with the given resource capability for each runtime.
    */
   private void addContainerRequests(Resource capability,
                                     Collection<RuntimeSpecification> runtimeSpecs,
-                                    Queue<ProvisionRequest> provisioning) {
+                                    Queue<ProvisionRequest> provisioning,
+                                    AllocationSpecification.Type allocationType) {
     for (RuntimeSpecification runtimeSpec : runtimeSpecs) {
       String name = runtimeSpec.getName();
       int newContainers = expectedContainers.getExpected(name) - runningContainers.count(name);
       if (newContainers > 0) {
+        if (allocationType.equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
+          //Spawning 1 instance at a time
+          newContainers = 1;
+        }
+        TwillSpecification.PlacementPolicy placementPolicy = placementPolicyManager.getPlacementPolicy(name);
+        Set<String> hosts = Sets.newHashSet();
+        Set<String> racks = Sets.newHashSet();
+        if (placementPolicy != null) {
+          hosts = placementPolicy.getHosts();
+          racks = placementPolicy.getRacks();
+        }
         // TODO: Allow user to set priority?
-        LOG.info("Request {} container with capability {}", newContainers, capability);
+        LOG.info("Request {} container with capability {} for runnable {}", newContainers, capability, name);
         String requestId = amClient.addContainerRequest(capability, newContainers)
-                .addHosts(runtimeSpec.getResourceSpecification().getHosts())
-                .addRacks(runtimeSpec.getResourceSpecification().getRacks())
+                .addHosts(hosts)
+                .addRacks(racks)
                 .setPriority(0).apply();
-        provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers));
+        provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers, allocationType));
       }
     }
   }
 
   /**
+   * Add a resource to the blacklist.
+   */
+  private void addToBlacklist(String resource) {
+    amClient.addToBlacklist(resource);
+  }
+
+  /**
+   * Remove a resource from the blacklist.
+   */
+  private void removeFromBlacklist(String resource) {
+    amClient.removeFromBlacklist(resource);
+  }
+
+  /**
+   * Clears the resource blacklist.
+   */
+  private void clearBlacklist() {
+    amClient.clearBlacklist();
+  }
+
+  /**
    * Launches runnables in the provisioned containers.
    */
   private void launchRunnable(List<? extends ProcessLauncher<YarnContainerInfo>> launchers,
@@ -564,9 +676,12 @@ public final class ApplicationMasterService extends AbstractTwillService {
         amClient.completeContainerRequest(provisionRequest.getRequestId());
       }
 
+      if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName) ||
+        provisioning.peek().getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
+        provisioning.poll();
+      }
       if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName)) {
         LOG.info("Runnable " + runnableName + " fully provisioned with " + containerCount + " instances.");
-        provisioning.poll();
       }
     }
   }
@@ -708,7 +823,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
               }
             } else {
               // Increase the number of instances
-              runnableContainerRequests.add(createRunnableContainerRequest(runnableName));
+              runnableContainerRequests.add(createRunnableContainerRequest(runnableName, newCount - oldCount));
             }
           } finally {
             runningContainers.sendToRunnable(runnableName, message, completion);
@@ -723,6 +838,11 @@ public final class ApplicationMasterService extends AbstractTwillService {
   }
 
   private RunnableContainerRequest createRunnableContainerRequest(final String runnableName) {
+    return createRunnableContainerRequest(runnableName, 1);
+  }
+
+  private RunnableContainerRequest createRunnableContainerRequest(final String runnableName,
+                                                                  final int numberOfInstances) {
     // Find the current order of the given runnable in order to create a RunnableContainerRequest.
     TwillSpecification.Order order = Iterables.find(twillSpec.getOrders(), new Predicate<TwillSpecification.Order>() {
       @Override
@@ -733,7 +853,20 @@ public final class ApplicationMasterService extends AbstractTwillService {
 
     RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
     Resource capability = createCapability(runtimeSpec.getResourceSpecification());
-    return new RunnableContainerRequest(order.getType(), ImmutableMultimap.of(capability, runtimeSpec));
+    Map<AllocationSpecification, Collection<RuntimeSpecification>> requestsMap = Maps.newHashMap();
+    if (placementPolicyManager.getPlacementPolicyType(runnableName).equals(
+      TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+      for (int instanceId = 0; instanceId < numberOfInstances; instanceId++) {
+        AllocationSpecification allocationSpecification =
+          new AllocationSpecification(capability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME,
+                                      runnableName, instanceId);
+        addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+      }
+    } else {
+      AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
+      addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+    }
+    return new RunnableContainerRequest(order.getType(), requestsMap);
   }
 
   private Runnable getMessageCompletion(final String messageId, final SettableFuture<String> future) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
new file mode 100644
index 0000000..91a58bb
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.internal.appmaster;
+
+import com.google.common.collect.Sets;
+import org.apache.twill.api.TwillSpecification;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class provides helper functions for operating on a set of Placement Policies.
+ */
+public class PlacementPolicyManager {
+  List<TwillSpecification.PlacementPolicy> placementPolicies;
+
+  public PlacementPolicyManager(List<TwillSpecification.PlacementPolicy> placementPolicies) {
+    this.placementPolicies = placementPolicies;
+  }
+
+  /**
+   * Given a set of runnables, get all runnables which belong to DISTRIBUTED placement policies.
+   * @param givenRunnables Set of runnables.
+   * @return Subset of runnables, which belong to DISTRIBUTED placement policies.
+   */
+  public Set<String> getDistributedRunnables(Set<String> givenRunnables) {
+    Set<String> distributedRunnables = getAllDistributedRunnables();
+    distributedRunnables.retainAll(givenRunnables);
+    return distributedRunnables;
+  }
+
+  /**
+   * Given a runnable, get the type of placement policy. Returns DEFAULT if no placement policy is specified.
+   * @param runnableName Name of runnable.
+   * @return Placement policy type of the runnable.
+   */
+  public TwillSpecification.PlacementPolicy.Type getPlacementPolicyType(String runnableName) {
+    for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+      if (placementPolicy.getNames().contains(runnableName)) {
+        return placementPolicy.getType();
+      }
+    }
+    return TwillSpecification.PlacementPolicy.Type.DEFAULT;
+  }
+
+  /**
+   * Get all runnables which belong to the same Placement policy as the given runnable.
+   * @param runnableName Name of runnable.
+   * @return Set of runnables, with same placement policy.
+   */
+  public Set<String> getFellowRunnables(String runnableName) {
+    for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+      if (placementPolicy.getNames().contains(runnableName)) {
+        return placementPolicy.getNames();
+      }
+    }
+    return Collections.emptySet();
+  }
+
+  /**
+   * Get the placement policy of the runnable.
+   * Returns null if runnable does not belong to a placement policy.
+   * @param runnableName Name of runnable.
+   * @return Placement policy of the runnable.
+   */
+  public TwillSpecification.PlacementPolicy getPlacementPolicy(String runnableName) {
+    for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+      if (placementPolicy.getNames().contains(runnableName)) {
+        return placementPolicy;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Gets all runnables which belong to DISTRIBUTED placement policies.
+   */
+  private Set<String> getAllDistributedRunnables() {
+    Set<String> distributedRunnables = Sets.newHashSet();
+    for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+      if (placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+        distributedRunnables.addAll(placementPolicy.getNames());
+      }
+    }
+    return  distributedRunnables;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
index 002d2a5..a6eba3e 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
@@ -26,11 +26,18 @@ final class ProvisionRequest {
   private final RuntimeSpecification runtimeSpec;
   private final String requestId;
   private int requestCount;
+  private final AllocationSpecification.Type type;
 
   ProvisionRequest(RuntimeSpecification runtimeSpec, String requestId, int requestCount) {
+    this(runtimeSpec, requestId, requestCount, AllocationSpecification.Type.DEFAULT);
+  }
+
+  ProvisionRequest(RuntimeSpecification runtimeSpec, String requestId, int requestCount,
+                   AllocationSpecification.Type type) {
     this.runtimeSpec = runtimeSpec;
     this.requestId = requestId;
     this.requestCount = requestCount;
+    this.type = type;
   }
 
   RuntimeSpecification getRuntimeSpec() {
@@ -49,4 +56,8 @@ final class ProvisionRequest {
     requestCount--;
     return requestCount == 0;
   }
+
+  AllocationSpecification.Type getType() {
+    return this.type;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
index de199ad..2105629 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
@@ -20,7 +20,6 @@ package org.apache.twill.internal.appmaster;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.api.TwillSpecification;
@@ -34,12 +33,12 @@ import java.util.Map;
  */
 final class RunnableContainerRequest {
   private final TwillSpecification.Order.Type orderType;
-  private final Iterator<Map.Entry<Resource, Collection<RuntimeSpecification>>> requests;
+  private final Iterator<Map.Entry<AllocationSpecification, Collection<RuntimeSpecification>>> requests;
 
   RunnableContainerRequest(TwillSpecification.Order.Type orderType,
-                           Multimap<Resource, RuntimeSpecification> requests) {
+                           Map<AllocationSpecification, Collection<RuntimeSpecification>> requests) {
     this.orderType = orderType;
-    this.requests = requests.asMap().entrySet().iterator();
+    this.requests = requests.entrySet().iterator();
   }
 
   TwillSpecification.Order.Type getOrderType() {
@@ -51,8 +50,8 @@ final class RunnableContainerRequest {
    * @return The {@link Resource} and {@link Collection} of {@link RuntimeSpecification} or
    *         {@code null} if there is no more request.
    */
-  Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> takeRequest() {
-    Map.Entry<Resource, Collection<RuntimeSpecification>> next = Iterators.getNext(requests, null);
+  Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> takeRequest() {
+    Map.Entry<AllocationSpecification, Collection<RuntimeSpecification>> next = Iterators.getNext(requests, null);
     return next == null ? null : Maps.immutableEntry(next.getKey(), ImmutableList.copyOf(next.getValue()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
index 2126541..0e3aea5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
@@ -68,8 +68,9 @@ public final class RunnableProcessLauncher extends AbstractYarnProcessLauncher<Y
 
     launchContext.setEnvironment(env);
 
-    LOG.info("Launching in container {} at {}, {}",
-             containerInfo.getId(), containerInfo.getHost(), launchContext.getCommands());
+    LOG.info("Launching in container {} at {}:{}, {}",
+             containerInfo.getId(), containerInfo.getHost().getHostName(),
+             containerInfo.getPort(), launchContext.getCommands());
     final Cancellable cancellable = nmClient.start(containerInfo, launchContext);
     launched = true;
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index 5267616..383e5e0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -20,10 +20,13 @@ package org.apache.twill.internal.appmaster;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Table;
 import com.google.common.util.concurrent.FutureCallback;
@@ -53,6 +56,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.BitSet;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
@@ -85,20 +89,24 @@ final class RunningContainers {
 
   // Map from runnableName to a BitSet, with the <instanceId> bit turned on for having an instance running.
   private final Map<String, BitSet> runnableInstances;
+  private final Map<String, Integer> completedContainerCount;
   private final DefaultResourceReport resourceReport;
   private final Deque<String> startSequence;
   private final Lock containerLock;
   private final Condition containerChange;
   private final ZKClient zkClient;
+  private final Multimap<String, ContainerInfo> containerStats;
 
   RunningContainers(String appId, TwillRunResources appMasterResources, ZKClient zookeeperClient) {
     containers = HashBasedTable.create();
     runnableInstances = Maps.newHashMap();
+    completedContainerCount = Maps.newHashMap();
     startSequence = Lists.newLinkedList();
     containerLock = new ReentrantLock();
     containerChange = containerLock.newCondition();
     resourceReport = new DefaultResourceReport(appId, appMasterResources);
     zkClient = zookeeperClient;
+    containerStats = HashMultimap.create();
   }
 
   /**
@@ -128,6 +136,7 @@ final class RunningContainers {
                                                                  containerInfo.getHost().getHostName(),
                                                                  controller);
       resourceReport.addRunResources(runnableName, resources);
+      containerStats.put(runnableName, containerInfo);
 
       if (startSequence.isEmpty() || !runnableName.equals(startSequence.peekLast())) {
         startSequence.addLast(runnableName);
@@ -157,6 +166,15 @@ final class RunningContainers {
   }
 
   /**
+   * Given a runnable name, returns a list of {@link org.apache.twill.internal.ContainerInfo} for it's instances.
+   * @param runnableName name of a runnable.
+   * @return a list of {@link org.apache.twill.internal.ContainerInfo} for instances of a runnable.
+   */
+  Collection<ContainerInfo> getContainerInfo(String runnableName) {
+    return containerStats.get(runnableName);
+  }
+
+  /**
    * Stops and removes the last running container of the given runnable.
    */
   void removeLast(String runnableName) {
@@ -186,6 +204,7 @@ final class RunningContainers {
       LOG.info("Stopping service: {} {}", runnableName, lastController.getRunId());
       lastController.stopAndWait();
       containers.remove(runnableName, lastContainerId);
+      removeContainerInfo(lastContainerId);
       removeInstanceId(runnableName, maxInstanceId);
       resourceReport.removeRunnableResources(runnableName, lastContainerId);
       containerChange.signalAll();
@@ -232,6 +251,18 @@ final class RunningContainers {
     }
   }
 
+  /**
+   * Returns a Map containing number of successfully completed containers for all runnables.
+   */
+  Map<String, Integer> getCompletedContainerCount() {
+    containerLock.lock();
+    try {
+      return ImmutableMap.copyOf(completedContainerCount);
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
   void sendToAll(Message message, Runnable completion) {
     containerLock.lock();
     try {
@@ -293,6 +324,7 @@ final class RunningContainers {
       }
       containers.clear();
       runnableInstances.clear();
+      containerStats.clear();
     } finally {
       containerLock.unlock();
     }
@@ -320,6 +352,7 @@ final class RunningContainers {
     ContainerState state = status.getState();
 
     try {
+      removeContainerInfo(containerId);
       Map<String, TwillContainerController> lookup = containers.column(containerId);
       if (lookup.isEmpty()) {
         // It's OK because if a container is stopped through removeLast, this would be empty.
@@ -346,6 +379,12 @@ final class RunningContainers {
         TwillContainerController controller = completedEntry.getValue();
         controller.completed(exitStatus);
 
+        if (exitStatus == ContainerExitCodes.SUCCESS) {
+          if (!completedContainerCount.containsKey(runnableName)) {
+            completedContainerCount.put(runnableName, 0);
+          }
+          completedContainerCount.put(runnableName, completedContainerCount.get(runnableName) + 1);
+        }
         removeInstanceId(runnableName, getInstanceId(controller.getRunId()));
         resourceReport.removeRunnableResources(runnableName, containerId);
       }
@@ -456,6 +495,21 @@ final class RunningContainers {
   }
 
   /**
+   * Given the containerId, removes the corresponding containerInfo.
+   * @param containerId Id for the container to be removed.
+   * @return Returns {@code false} if container with the provided id was not found, {@code true} otherwise.
+   */
+  private boolean removeContainerInfo(String containerId) {
+    for (ContainerInfo containerInfo : this.containerStats.values()) {
+      if (containerInfo.getId().equals(containerId)) {
+        this.containerStats.values().remove(containerInfo);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * A helper class that overrides the debug port of the resources with the live info from the container controller.
    */
   private static class DynamicTwillRunResources extends DefaultTwillRunResources {