You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/09/26 01:51:08 UTC

[3/3] samza git commit: SAMZA-619: SamzaAppMaster changes to enable host-affinity

SAMZA-619: SamzaAppMaster changes to enable host-affinity


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c76132ff
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c76132ff
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c76132ff

Branch: refs/heads/master
Commit: c76132ff2703a587f38662adab3d3ff8e5fca6bb
Parents: 34eaec4
Author: Navina Ramesh <na...@gmail.com>
Authored: Fri Sep 25 16:50:45 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Fri Sep 25 16:50:45 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   4 +
 .../versioned/jobs/configuration-table.html     |  30 ++
 .../apache/samza/container/LocalityManager.java |  16 +-
 .../messages/SetContainerHostMapping.java       |  17 +-
 .../org/apache/samza/job/model/JobModel.java    |  29 ++
 .../apache/samza/container/SamzaContainer.scala |   4 +-
 .../main/scala/org/apache/samza/util/Util.scala |   1 +
 .../org/apache/samza/config/YarnConfig.java     | 163 +++++++
 .../job/yarn/AbstractContainerAllocator.java    | 114 +++++
 .../samza/job/yarn/ContainerAllocator.java      |  78 ++++
 .../apache/samza/job/yarn/ContainerFailure.java |  48 ++
 .../samza/job/yarn/ContainerRequestState.java   | 277 ++++++++++++
 .../apache/samza/job/yarn/ContainerUtil.java    | 212 +++++++++
 .../job/yarn/HostAwareContainerAllocator.java   | 125 +++++
 .../apache/samza/job/yarn/SamzaAppState.java    | 170 +++++++
 .../samza/job/yarn/SamzaContainerRequest.java   | 112 +++++
 .../apache/samza/job/yarn/SamzaTaskManager.java | 283 ++++++++++++
 .../resources/scalate/WEB-INF/views/index.scaml |   7 +-
 .../org/apache/samza/config/YarnConfig.scala    |  58 ---
 .../apache/samza/job/yarn/SamzaAppMaster.scala  |  20 +-
 .../job/yarn/SamzaAppMasterLifecycle.scala      |   4 +-
 .../samza/job/yarn/SamzaAppMasterMetrics.scala  |  12 +-
 .../samza/job/yarn/SamzaAppMasterService.scala  |   2 +-
 .../samza/job/yarn/SamzaAppMasterState.scala    |  56 ---
 .../job/yarn/SamzaAppMasterTaskManager.scala    | 314 -------------
 .../org/apache/samza/job/yarn/YarnJob.scala     |  25 +-
 .../webapp/ApplicationMasterRestServlet.scala   |  10 +-
 .../webapp/ApplicationMasterWebServlet.scala    |   4 +-
 .../samza/job/yarn/TestContainerAllocator.java  | 245 ++++++++++
 .../job/yarn/TestContainerRequestState.java     | 221 +++++++++
 .../yarn/TestHostAwareContainerAllocator.java   | 335 ++++++++++++++
 .../job/yarn/TestSamzaContainerRequest.java     |  43 ++
 .../samza/job/yarn/TestSamzaTaskManager.java    | 434 ++++++++++++++++++
 .../job/yarn/util/MockContainerAllocator.java   |  44 ++
 .../samza/job/yarn/util/MockContainerUtil.java  |  61 +++
 .../samza/job/yarn/util/MockHttpServer.java     |  55 +++
 .../samza/job/yarn/util/MockNMClient.java       |  50 ++
 .../samza/job/yarn/util/TestAMRMClientImpl.java |  91 ++++
 .../apache/samza/job/yarn/util/TestUtil.java    | 263 +++++++++++
 .../samza/job/yarn/TestSamzaAppMaster.scala     |  90 +++-
 .../job/yarn/TestSamzaAppMasterLifecycle.scala  |  15 +-
 .../job/yarn/TestSamzaAppMasterService.scala    |  56 ++-
 .../yarn/TestSamzaAppMasterTaskManager.scala    | 453 -------------------
 43 files changed, 3681 insertions(+), 970 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 0c38a76..682d4f8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -274,6 +274,10 @@ project(":samza-yarn_$scalaVersion") {
   apply plugin: 'scala'
   apply plugin: 'lesscss'
 
+  // Force scala joint compilation
+  sourceSets.main.scala.srcDir "src/main/java"
+  sourceSets.main.java.srcDirs = []
+
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index c23d7d3..b42c34c 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1317,6 +1317,36 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="yarn-allocator-sleep-ms">yarn.allocator.sleep.ms</td>
+                    <td class="default">3600</td>
+                    <td class="description">
+                        The container allocator thread is responsible for matching requests to allocated containers.
+                        The sleep interval for this thread is configured using this property.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="yarn-samza-host_affinity-enabled">yarn.samza.host-affinity.enabled</td>
+                    <td class="default">false</td>
+                    <td class="description">
+                        This property indicates whether host-affinity is enabled or not. Host-affinity refers to the ability of the Application Master to request and allocate a container on the same host every time the job is deployed.
+                        When host-affinity is enabled, the AM makes a "best-effort" to honor the host-affinity constraint.
+                        The property <a href="#yarn-container-request-timeout-ms" class="property">yarn.container.request.timeout.ms</a> determines how long to wait before de-prioritizing the host-affinity constraint and assigning the container to any available resource.
+                        <b>Please Note</b>: This feature is tested to work with the FairScheduler in Yarn when continuous-scheduling is enabled.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="yarn-container-request-timeout-ms">yarn.container.request.timeout.ms</td>
+                    <td class="default">5000</td>
+                    <td class="description">
+                        The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource.
+                        This property determines the number of milliseconds before a container request is considered to have expired / timed-out.
+                        When a request expires, it gets allocated to any available container that was returned by the RM.
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" id="yarn-queue">yarn.queue</td>
                     <td class="default"></td>
                     <td class="description">

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index c567bf4..d19b574 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -68,7 +68,7 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
     for (CoordinatorStreamMessage message: getBootstrappedStream(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping mapping = new SetContainerHostMapping(message);
       Map<String, String> localityMappings = new HashMap<>();
-      localityMappings.put(SetContainerHostMapping.IP_KEY, mapping.getHostLocality());
+      localityMappings.put(SetContainerHostMapping.HOST_KEY, mapping.getHostLocality());
       localityMappings.put(SetContainerHostMapping.JMX_URL_KEY, mapping.getJmxUrl());
       localityMappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, mapping.getJmxTunnelingUrl());
       log.info(String.format("Read locality for container %s: %s", mapping.getKey(), localityMappings));
@@ -78,17 +78,17 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
     return allMappings;
   }
 
-  public void writeContainerToHostMapping(Integer containerId, String hostHttpAddress, String jmxAddress, String jmxTunnelingAddress) {
+  public void writeContainerToHostMapping(Integer containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) {
     Map<String, String> existingMappings = containerToHostMapping.get(containerId);
-    String existingIpMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.IP_KEY) : null;
-    if (existingIpMapping != null && !existingIpMapping.equals(hostHttpAddress)) {
-      log.info("Container {} moved from {} to {}", new Object[]{containerId, existingIpMapping, hostHttpAddress});
+    String existingHostMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
+    if (existingHostMapping != null && !existingHostMapping.equals(hostName)) {
+      log.info("Container {} moved from {} to {}", new Object[]{containerId, existingHostMapping, hostName});
     } else {
-      log.info("Container {} started at {}", containerId, hostHttpAddress);
+      log.info("Container {} started at {}", containerId, hostName);
     }
-    send(new SetContainerHostMapping(getSource() + containerId, String.valueOf(containerId), hostHttpAddress, jmxAddress, jmxTunnelingAddress));
+    send(new SetContainerHostMapping(getSource() + containerId, String.valueOf(containerId), hostName, jmxAddress, jmxTunnelingAddress));
     Map<String, String> mappings = new HashMap<>();
-    mappings.put(SetContainerHostMapping.IP_KEY, hostHttpAddress);
+    mappings.put(SetContainerHostMapping.HOST_KEY, hostName);
     mappings.put(SetContainerHostMapping.JMX_URL_KEY, jmxAddress);
     mappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, jmxTunnelingAddress);
     containerToHostMapping.put(containerId, mappings);

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
index 5455881..4d093b5 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
@@ -30,7 +30,7 @@ package org.apache.samza.coordinator.stream.messages;
  *     Source: "SamzaContainer-$ContainerId"
  *     MessageMap:
  *     {
- *         ip: InetAddressString,
+ *         hostname: Name of the host
  *         jmx-url: jmxAddressString
  *         jmx-tunneling-url: jmxTunnelingAddressString
  *     }
@@ -38,7 +38,7 @@ package org.apache.samza.coordinator.stream.messages;
  * */
 public class SetContainerHostMapping extends CoordinatorStreamMessage {
   public static final String TYPE = "set-container-host-assignment";
-  public static final String IP_KEY = "ip";
+  public static final String HOST_KEY = "host";
   public static final String JMX_URL_KEY = "jmx-url";
   public static final String JMX_TUNNELING_URL_KEY = "jmx-tunneling-url";
 
@@ -54,26 +54,21 @@ public class SetContainerHostMapping extends CoordinatorStreamMessage {
    * SteContainerToHostMapping is used to set the container to host mapping information.
    * @param source the source of the message
    * @param key the key which is used to persist the message
-   * @param hostHttpAddress the IP address of the container
+   * @param hostName the hostname of the container
    * @param jmxAddress the JMX address of the container
    * @param jmxTunnelingAddress the JMX tunneling address of the container
    */
-  public SetContainerHostMapping(String source, String key, String hostHttpAddress, String jmxAddress, String jmxTunnelingAddress) {
+  public SetContainerHostMapping(String source, String key, String hostName, String jmxAddress, String jmxTunnelingAddress) {
     super(source);
     setType(TYPE);
     setKey(key);
-    putMessageValue(IP_KEY, hostHttpAddress);
+    putMessageValue(HOST_KEY, hostName);
     putMessageValue(JMX_URL_KEY, jmxAddress);
     putMessageValue(JMX_TUNNELING_URL_KEY, jmxTunnelingAddress);
   }
 
-  /**
-   * Returns the IP address of the container.
-   * @return the container IP address
-   */
   public String getHostLocality() {
-    return getMessageValue(IP_KEY);
-
+    return getMessageValue(HOST_KEY);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index 7b59274..9445a30 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -20,9 +20,11 @@
 package org.apache.samza.job.model;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 
 /**
  * <p>
@@ -42,6 +44,7 @@ public class JobModel {
   private final Map<Integer, ContainerModel> containers;
 
   private final LocalityManager localityManager;
+  private Map<Integer, String> localityMappings = new HashMap<Integer, String>();
 
   public int maxChangeLogStreamPartitions;
 
@@ -54,6 +57,15 @@ public class JobModel {
     this.containers = Collections.unmodifiableMap(containers);
     this.localityManager = localityManager;
 
+    if (localityManager == null) {
+      for (Integer containerId : containers.keySet()) {
+        localityMappings.put(containerId, null);
+      }
+    } else {
+      populateContainerLocalityMappings();
+    }
+
+
     // Compute the number of change log stream partitions as the maximum partition-id
     // of all total number of tasks of the job; Increment by 1 because partition ids
     // start from 0 while we need the absolute count.
@@ -91,6 +103,23 @@ public class JobModel {
     return mappings.get(key);
   }
 
+  private void populateContainerLocalityMappings() {
+    Map<Integer, Map<String, String>> allMappings = localityManager.readContainerLocality();
+    for (Integer containerId: containers.keySet()) {
+      if (allMappings.containsKey(containerId)) {
+        localityMappings.put(containerId, allMappings.get(containerId).get(SetContainerHostMapping.HOST_KEY));
+      } else {
+        localityMappings.put(containerId, null);
+      }
+    }
+  }
+
+  public Map<Integer, String> getAllContainerLocality() {
+    if (localityManager != null) {
+      populateContainerLocalityMappings();
+    }
+    return localityMappings;
+  }
 
   public Map<Integer, ContainerModel> getContainers() {
     return containers;

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 61e228b..f351ad6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -629,10 +629,10 @@ class SamzaContainer(
 
       info("Writing container locality and JMX address to Coordinator Stream")
       try {
-        val hostInetAddress = Util.getLocalHost.getHostAddress
+        val hostInet = Util.getLocalHost
         val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else ""
         val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else ""
-        localityManager.writeContainerToHostMapping(containerContext.id, hostInetAddress, jmxUrl, jmxTunnelingUrl)
+        localityManager.writeContainerToHostMapping(containerContext.id, hostInet.getHostName, jmxUrl, jmxTunnelingUrl)
       } catch {
         case uhe: UnknownHostException =>
           warn("Received UnknownHostException when persisting locality info for container %d: %s" format (containerContext.id, uhe.getMessage))  //No-op

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index c8438bd..948c19a 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -41,6 +41,7 @@ import scala.util.control.Breaks._
 object Util extends Logging {
   val random = new Random
 
+  def clock: Long = System.currentTimeMillis
   /**
    * Make an environment variable string safe to pass.
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
new file mode 100644
index 0000000..a572aa2
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import org.apache.samza.SamzaException;
+
+public class YarnConfig extends MapConfig {
+  /**
+   * (Required) URL from which the job package can be downloaded
+   */
+  public static final String PACKAGE_PATH = "yarn.package.path";
+
+  // Configs related to each yarn container
+  /**
+   * Memory, in megabytes, to request from YARN per container
+   */
+  public static final String CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb";
+  private static final int DEFAULT_CONTAINER_MEM = 1024;
+
+  /**
+   * Number of CPU cores to request from YARN per container
+   */
+  public static final String CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores";
+  private static final int DEFAULT_CPU_CORES = 1;
+
+  /**
+   * Maximum number of times the AM tries to restart a failed container
+   */
+  public static final String CONTAINER_RETRY_COUNT = "yarn.container.retry.count";
+  private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
+
+  /**
+   * Determines how frequently a container is allowed to fail before we give up and fail the job
+   */
+  public static final String CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms";
+  private static final int DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000;
+
+  // Configs related to the Samza Application Master (AM)
+  /**
+   * (Optional) JVM options to include in the command line when executing the AM
+   */
+  public static final String AM_JVM_OPTIONS = "yarn.am.opts";
+
+  /**
+   * Determines whether a JMX server should be started on the AM
+   * Default: true
+   */
+  public static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled";
+
+  /**
+   * Memory, in megabytes, to request from YARN for running the AM
+   */
+  public static final String AM_CONTAINER_MAX_MEMORY_MB = "yarn.am.container.memory.mb";
+  private static final int DEFAULT_AM_CONTAINER_MAX_MEMORY_MB = 1024;
+
+  /**
+   * Determines the interval for the Heartbeat between the AM and the Yarn RM
+   */
+  public static final String AM_POLL_INTERVAL_MS = "yarn.am.poll.interval.ms";
+  private static final int DEFAULT_POLL_INTERVAL_MS = 1000;
+
+  /**
+   * (Optional) JAVA_HOME path for Samza AM
+   */
+  public static final String AM_JAVA_HOME = "yarn.am.java.home";
+
+  // Configs related to the ContainerAllocator thread
+  /**
+   * Sleep interval for the allocator thread in milliseconds
+   */
+  public static final String ALLOCATOR_SLEEP_MS = "yarn.allocator.sleep.ms";
+  private static final int DEFAULT_ALLOCATOR_SLEEP_MS = 3600;
+  /**
+   * Number of milliseconds before a container request is considered to have to expired
+   */
+  public static final String CONTAINER_REQUEST_TIMEOUT_MS = "yarn.container.request.timeout.ms";
+  private static final int DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS = 5000;
+
+  /**
+   * Flag to indicate if host-affinity is enabled for the job or not
+   */
+  public static final String HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled";
+  private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false;
+
+
+  public YarnConfig(Config config) {
+    super(config);
+  }
+
+  public int getContainerRetryCount() {
+    return getInt(CONTAINER_RETRY_COUNT, DEFAULT_CONTAINER_RETRY_COUNT);
+  }
+
+  public int getContainerRetryWindowMs() {
+    return getInt(CONTAINER_RETRY_WINDOW_MS, DEFAULT_CONTAINER_RETRY_WINDOW_MS);
+  }
+
+  public int getAMPollIntervalMs() {
+    return getInt(AM_POLL_INTERVAL_MS, DEFAULT_POLL_INTERVAL_MS);
+  }
+
+  public int getContainerMaxMemoryMb() {
+    return getInt(CONTAINER_MAX_MEMORY_MB, DEFAULT_CONTAINER_MEM);
+  }
+
+  public int getContainerMaxCpuCores() {
+    return getInt(CONTAINER_MAX_CPU_CORES, DEFAULT_CPU_CORES);
+  }
+
+  public boolean getJmxServerEnabled() {
+    return getBoolean(AM_JMX_ENABLED, true);
+  }
+
+  public String getPackagePath() {
+    String packagePath = get(PACKAGE_PATH);
+    if (packagePath == null) {
+      throw new SamzaException("No YARN package path defined in config.");
+    }
+    return packagePath;
+  }
+
+  public int getAMContainerMaxMemoryMb() {
+    return getInt(AM_CONTAINER_MAX_MEMORY_MB, DEFAULT_AM_CONTAINER_MAX_MEMORY_MB);
+  }
+
+  public String getAmOpts() {
+    return get(AM_JVM_OPTIONS, "");
+  }
+
+
+  public String getAMJavaHome() {
+    return get(AM_JAVA_HOME, null);
+  }
+
+  public int getAllocatorSleepTime() {
+    return getInt(ALLOCATOR_SLEEP_MS, DEFAULT_ALLOCATOR_SLEEP_MS);
+  }
+
+  public int getContainerRequestTimeout() {
+    return getInt(CONTAINER_REQUEST_TIMEOUT_MS, DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS);
+  }
+
+  public boolean getHostAffinityEnabled() {
+    return getBoolean(HOST_AFFINITY_ENABLED, DEFAULT_HOST_AFFINITY_ENABLED);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
new file mode 100644
index 0000000..eec1708
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class is responsible for making requests for containers to the AM and also, assigning a container to run on an allocated resource.
+ *
+ * Since we are using a simple thread based allocation of a container to an allocated resource, the subclasses should implement {@link java.lang.Runnable} interface.
+ * The allocator thread follows the lifecycle of the {@link org.apache.samza.job.yarn.SamzaTaskManager}. Depending on whether host-affinity is enabled or not, the allocation model varies.
+ *
+ * See {@link org.apache.samza.job.yarn.ContainerAllocator} and {@link org.apache.samza.job.yarn.HostAwareContainerAllocator}
+ */
+public abstract class AbstractContainerAllocator implements Runnable {
+  public static final String ANY_HOST = ContainerRequestState.ANY_HOST;
+  public static final int DEFAULT_PRIORITY = 0;
+  public static final int DEFAULT_CONTAINER_MEM = 1024;
+  public static final int DEFAULT_CPU_CORES = 1;
+
+  protected final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
+  protected final int ALLOCATOR_SLEEP_TIME;
+  protected final ContainerUtil containerUtil;
+
+  @Override
+  public abstract void run();
+
+  // containerRequestState indicate the state of all unfulfilled container requests and allocated containers
+  protected final ContainerRequestState containerRequestState;
+
+  // state that controls the lifecycle of the allocator thread
+  protected AtomicBoolean isRunning = new AtomicBoolean(true);
+
+  public AbstractContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
+                            ContainerUtil containerUtil,
+                            int allocatorSleepTime,
+                            ContainerRequestState containerRequestState) {
+    this.amClient = amClient;
+    this.containerUtil = containerUtil;
+    this.ALLOCATOR_SLEEP_TIME = allocatorSleepTime;
+    this.containerRequestState = containerRequestState;
+  }
+
+
+  /**
+   * Called during initial request for containers
+   *
+   * @param containerToHostMappings Map of containerId to its last seen host (locality).
+   *                                The locality value is null, either
+   *                                - when host-affinity is not enabled, or
+   *                                - when host-affinity is enabled and job is run for the first time
+   */
+  public void requestContainers(Map<Integer, String> containerToHostMappings) {
+    for (Map.Entry<Integer, String> entry : containerToHostMappings.entrySet()) {
+      int containerId = entry.getKey();
+      String preferredHost = entry.getValue();
+      if (preferredHost == null)
+        preferredHost = ANY_HOST;
+
+      requestContainer(containerId, preferredHost);
+    }
+  }
+  /**
+   * Method to request a container resource from yarn
+   *
+   * @param expectedContainerId Identifier of the container that will be run when a container resource is allocated for
+   *                            this request
+   * @param preferredHost Name of the host that you prefer to run the container on
+   */
+  public final void requestContainer(int expectedContainerId, String preferredHost) {
+    SamzaContainerRequest request = new SamzaContainerRequest(expectedContainerId, preferredHost);
+    containerRequestState.updateRequestState(request);
+  }
+
+  /**
+   * Method that adds allocated container to a synchronized buffer of allocated containers list
+   * See allocatedContainers in {@link org.apache.samza.job.yarn.ContainerRequestState}
+   *
+   * @param container Container resource returned by the RM
+   */
+  public final void addContainer(Container container) {
+    containerRequestState.addContainer(container);
+  }
+
+  public final void setIsRunning(boolean state) {
+    isRunning.set(state);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
new file mode 100644
index 0000000..9911540
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the default allocator thread that will be used by SamzaTaskManager.
+ *
+ * When host-affinity is not enabled, this thread periodically wakes up to assign a container to an allocated resource.
+ * If there aren't enough containers, it waits by sleeping for {@code ALLOCATOR_SLEEP_TIME} milliseconds.
+ */
+public class ContainerAllocator extends AbstractContainerAllocator {
+  private static final Logger log = LoggerFactory.getLogger(ContainerAllocator.class);
+
+  public ContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
+                            ContainerUtil containerUtil,
+                            int allocatorSleepTime) {
+    super(amClient, containerUtil, allocatorSleepTime, new ContainerRequestState(amClient, false));
+  }
+
+  /**
+   * During the run() method, the thread sleeps for ALLOCATOR_SLEEP_TIME ms. It tries to allocate any unsatisfied
+   * request that is still in the request queue (See requests in {@link org.apache.samza.job.yarn.ContainerRequestState})
+   * with allocated containers, if any.
+   *
+   * Since host-affinity is not enabled, all allocated container resources are buffered in the list keyed by "ANY_HOST".
+   * */
+  @Override
+  public void run() {
+    while(isRunning.get()) {
+      try {
+        List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
+        while (!containerRequestState.getRequestsQueue().isEmpty() && allocatedContainers != null && allocatedContainers.size() > 0) {
+          SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
+          Container container = allocatedContainers.get(0);
+
+          // Update state
+          containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
+
+          // Cancel request and run container
+          log.info("Running {} on {}", request.expectedContainerId, container.getId());
+          containerUtil.runContainer(request.expectedContainerId, container);
+        }
+
+        // If requestQueue is empty, all extra containers in the buffer should be released.
+        containerRequestState.releaseExtraContainers();
+
+        Thread.sleep(ALLOCATOR_SLEEP_TIME);
+      } catch (InterruptedException e) {
+        log.info("Got InterruptedException in AllocatorThread. Pending Container request(s) cannot be fulfilled!!", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java
new file mode 100644
index 0000000..1d15651
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn;
+
+/**
+ * Class that encapsulates information related to a container failure
+ * */
+public class ContainerFailure {
+  /**
+   * Number of times a container has failed
+   * */
+  private int count;
+  /**
+   * Latest failure time of the container
+   * */
+  private Long lastFailure;
+
+  public ContainerFailure(int count,
+                          Long lastFailure) {
+    this.count = count;
+    this.lastFailure = lastFailure;
+  }
+
+  public int getCount() {
+    return count;
+  }
+
+  public Long getLastFailure() {
+    return lastFailure;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
new file mode 100644
index 0000000..b5e0368
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class maintains the state variables for all the container requests and the allocated containers returned
+ * by the RM
+ * Important: Even though we use concurrent data structures, this class is not thread-safe. Thread safety has to be
+ * handled by the caller.
+ */
+public class ContainerRequestState {
+  private static final Logger log = LoggerFactory.getLogger(ContainerRequestState.class);
+  public static final String ANY_HOST = "ANY_HOST";
+
+  /**
+   * Maintain a map of hostname to a list of containers allocated on this host
+   */
+  private final ConcurrentHashMap<String, List<Container>> allocatedContainers = new ConcurrentHashMap<String, List<Container>>();
+  /**
+   * Represents the queue of container requests made by the {@link org.apache.samza.job.yarn.SamzaTaskManager}
+   */
+  private final PriorityBlockingQueue<SamzaContainerRequest> requestsQueue = new PriorityBlockingQueue<SamzaContainerRequest>();
+  /**
+   * Maintain a map of hostname to the number of requests made for containers on this host
+   * This state variable is used to look-up whether an allocated container on a host was ever requested in the past.
+   * This map is not updated when host-affinity is not enabled
+   */
+  private final ConcurrentHashMap<String, AtomicInteger> requestsToCountMap = new ConcurrentHashMap<String, AtomicInteger>();
+  /**
+   * Indicates whether host-affinity is enabled or not
+   */
+  private final boolean hostAffinityEnabled;
+
+  private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
+
+  // TODO: Refactor such that the state class for host-affinity enabled allocator is a subclass of a generic state class
+  public ContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
+                               boolean hostAffinityEnabled) {
+    this.amClient = amClient;
+    this.hostAffinityEnabled = hostAffinityEnabled;
+  }
+
+  /**
+   * This method is called every time {@link org.apache.samza.job.yarn.SamzaTaskManager} requestsQueue for a container
+   * Adds {@link org.apache.samza.job.yarn.SamzaContainerRequest} to the requestsQueue queue.
+   * If host-affinity is enabled, it updates the requestsToCountMap as well.
+   *
+   * @param request {@link org.apache.samza.job.yarn.SamzaContainerRequest} that was sent to the RM
+   */
+  public synchronized void updateRequestState(SamzaContainerRequest request) {
+
+    log.info("Requesting a container for {} at {}", request.getExpectedContainerId(), request.getPreferredHost());
+    amClient.addContainerRequest(request.getIssuedRequest());
+
+    requestsQueue.add(request);
+    String preferredHost = request.getPreferredHost();
+    if (hostAffinityEnabled) {
+      if (requestsToCountMap.containsKey(preferredHost)) {
+        requestsToCountMap.get(preferredHost).incrementAndGet();
+      } else {
+        requestsToCountMap.put(preferredHost, new AtomicInteger(1));
+      }
+      /**
+       * The following is important to correlate allocated container data with the requestsQueue made before. If
+       * the preferredHost is requested for the first time, the state should reflect that the allocatedContainers
+       * list is empty and NOT null.
+       */
+      if (!allocatedContainers.containsKey(preferredHost)) {
+        allocatedContainers.put(preferredHost, new ArrayList<Container>());
+      }
+    }
+  }
+
+  /**
+   * This method is called every time the RM returns an allocated container.
+   * Adds the allocated container resource to the correct allocatedContainers buffer
+   * @param container Container resource that was returned by the RM
+   */
+  public synchronized void addContainer(Container container) {
+    if(hostAffinityEnabled) {
+      String hostName = container.getNodeHttpAddress().split(":")[0];
+      AtomicInteger requestCount = requestsToCountMap.get(hostName);
+      // Check if this host was requested for any of the containers
+      if (requestCount == null || requestCount.get() == 0) {
+        log.debug(
+            "Request count for the allocatedContainer on {} is null or 0. This means that the host was not requested " +
+                "for running containers.Hence, saving the container {} in the buffer for ANY_HOST",
+            hostName,
+            container.getId()
+        );
+        addToAllocatedContainerList(ANY_HOST, container);
+      } else {
+        int requestCountOnThisHost = requestCount.get();
+        List<Container> allocatedContainersOnThisHost = allocatedContainers.get(hostName);
+        if (requestCountOnThisHost > 0) {
+          if (allocatedContainersOnThisHost == null) {
+            log.debug("Saving the container {} in the buffer for {}", container.getId(), hostName);
+            addToAllocatedContainerList(hostName, container);
+          } else {
+            if (allocatedContainersOnThisHost.size() < requestCountOnThisHost) {
+              log.debug("Saving the container {} in the buffer for {}", container.getId(), hostName);
+              addToAllocatedContainerList(hostName, container);
+            } else {
+              /**
+               * The RM may allocate more containers on a given host than requested. In such a case, even though the
+               * requestCount != 0, it will be greater than the total request count for that host. Hence, it should be
+               * assigned to ANY_HOST
+               */
+              log.debug(
+                  "The number of containers already allocated on {} is greater than what was " +
+                      "requested, which is {}. Hence, saving the container {} in the buffer for ANY_HOST",
+                  new Object[]{
+                      hostName,
+                      requestCountOnThisHost,
+                      container.getId()
+                  }
+              );
+              addToAllocatedContainerList(ANY_HOST, container);
+            }
+          }
+        } else {
+          log.debug(
+              "This host was never requested. Hence, saving the container {} in the buffer for ANY_HOST",
+              new Object[]{
+                  hostName,
+                  requestCountOnThisHost,
+                  container.getId()
+              }
+          );
+          addToAllocatedContainerList(ANY_HOST, container);
+        }
+      }
+    } else {
+      log.debug("Saving the container {} in the buffer for ANY_HOST", container.getId());
+      addToAllocatedContainerList(ANY_HOST, container);
+    }
+  }
+
+  // Update the allocatedContainers list
+  private void addToAllocatedContainerList(String host, Container container) {
+    List<Container> list = allocatedContainers.get(host);
+    if (list != null) {
+      list.add(container);
+    } else {
+      list = new ArrayList<Container>();
+      list.add(container);
+      allocatedContainers.put(host, list);
+    }
+  }
+
+  /**
+   * This method updates the state after a request is fulfilled and a container starts running on a host
+   * Needs to be synchronized because the state buffers are populated by the AMRMCallbackHandler, whereas it is drained by the allocator thread
+   *
+   * @param request {@link org.apache.samza.job.yarn.SamzaContainerRequest} that was fulfilled
+   * @param assignedHost  Host to which the container was assigned
+   * @param container Allocated container resource that was used to satisfy this request
+   */
+  public synchronized void updateStateAfterAssignment(SamzaContainerRequest request, String assignedHost, Container container) {
+    requestsQueue.remove(request);
+    allocatedContainers.get(assignedHost).remove(container);
+    if (hostAffinityEnabled) {
+      // assignedHost may not always be the preferred host.
+      // Hence, we should safely decrement the counter for the preferredHost
+      requestsToCountMap.get(request.getPreferredHost()).decrementAndGet();
+    }
+    // To avoid getting back excess containers
+    amClient.removeContainerRequest(request.getIssuedRequest());
+  }
+
+  /**
+   * If requestQueue is empty, all extra containers in the buffer should be released and update the entire system's state
+   * Needs to be synchronized because it is modifying shared state buffers
+   */
+  public synchronized void releaseExtraContainers() {
+    if (hostAffinityEnabled) {
+      if (requestsQueue.isEmpty()) {
+        log.info("Requests Queue is empty. Should clear up state.");
+
+        List<String> allocatedHosts = getAllocatedHosts();
+        for (String host : allocatedHosts) {
+          List<Container> containers = getContainersOnAHost(host);
+          if (containers != null) {
+            for (Container c : containers) {
+              log.info("Releasing extra container {} allocated on {}", c.getId(), host);
+              amClient.releaseAssignedContainer(c.getId());
+            }
+          }
+        }
+        clearState();
+      }
+    } else {
+      if (requestsQueue.isEmpty()) {
+        log.info("No more pending requests in queue.");
+
+        List<Container> availableContainers = getContainersOnAHost(ANY_HOST);
+        while(availableContainers != null && !availableContainers.isEmpty()) {
+          Container c = availableContainers.remove(0);
+          log.info("Releasing extra allocated container - {}", c.getId());
+          amClient.releaseAssignedContainer(c.getId());
+        }
+        clearState();
+      }
+    }
+  }
+
+  /**
+   * Clears all the state variables
+   * Performed when there are no more unfulfilled requests
+   */
+  private void clearState() {
+    allocatedContainers.clear();
+    requestsToCountMap.clear();
+    requestsQueue.clear();
+  }
+
+  /**
+   * Returns the list of hosts which has at least 1 allocatedContainer in the buffer
+   * @return list of host names
+   */
+  private List<String> getAllocatedHosts() {
+    List<String> hostKeys = new ArrayList<String>();
+    for(Map.Entry<String, List<Container>> entry: allocatedContainers.entrySet()) {
+      if(entry.getValue().size() > 0) {
+        hostKeys.add(entry.getKey());
+      }
+    }
+    return hostKeys;
+  }
+
+  /**
+   * Returns the list of containers allocated on a given host
+   * If no containers were ever allocated on the given host, it returns null.
+   * @param host hostname
+   * @return list of containers allocated on the given host, or null
+   */
+  public List<Container> getContainersOnAHost(String host) {
+    return allocatedContainers.get(host);
+  }
+
+  public PriorityBlockingQueue<SamzaContainerRequest> getRequestsQueue() {
+    return requestsQueue;
+  }
+
+  public ConcurrentHashMap<String, AtomicInteger> getRequestsToCountMap() {
+    return requestsToCountMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
new file mode 100644
index 0000000..2f43501
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.config.YarnConfig;
+import org.apache.samza.job.CommandBuilder;
+import org.apache.samza.job.ShellCommandBuilder;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class ContainerUtil {
+  private static final Logger log = LoggerFactory.getLogger(ContainerUtil.class);
+
+  private final Config config;
+  private final SamzaAppState state;
+  private final YarnConfiguration yarnConfiguration;
+
+  private NMClient nmClient;
+  private final YarnConfig yarnConfig;
+  private final TaskConfig taskConfig;
+
+  public ContainerUtil(Config config,
+                       SamzaAppState state,
+                       YarnConfiguration yarnConfiguration) {
+    this.config = config;
+    this.state = state;
+    this.yarnConfiguration = yarnConfiguration;
+
+    this.nmClient = NMClient.createNMClient();
+    nmClient.init(this.yarnConfiguration);
+
+    this.yarnConfig = new YarnConfig(config);
+    this.taskConfig = new TaskConfig(config);
+  }
+
+  protected void setNmClient(NMClient nmClient){
+    this.nmClient = nmClient;
+  }
+
+  public void runContainer(int samzaContainerId, Container container) {
+    String containerIdStr = ConverterUtils.toString(container.getId());
+    log.info("Got available container ID ({}) for container: {}", samzaContainerId, container);
+
+    String cmdBuilderClassName;
+    if (taskConfig.getCommandClass().isDefined()) {
+      cmdBuilderClassName = taskConfig.getCommandClass().get();
+    } else {
+      cmdBuilderClassName = ShellCommandBuilder.class.getName();
+    }
+      CommandBuilder cmdBuilder = (CommandBuilder) Util.getObj(cmdBuilderClassName);
+      cmdBuilder
+          .setConfig(config)
+          .setId(samzaContainerId)
+          .setUrl(state.coordinatorUrl);
+
+      String command = cmdBuilder.buildCommand();
+      log.info("Container ID {} using command {}", samzaContainerId, command);
+
+      log.info("Container ID {} using environment variables: ", samzaContainerId);
+      Map<String, String> env = new HashMap<String, String>();
+      for (Map.Entry<String, String> entry: cmdBuilder.buildEnvironment().entrySet()) {
+        String escapedValue = Util.envVarEscape(entry.getValue());
+        env.put(entry.getKey(), escapedValue);
+        log.info("{}={} ", entry.getKey(), escapedValue);
+      }
+
+      Path path = new Path(yarnConfig.getPackagePath());
+      log.info("Starting container ID {} using package path %s", samzaContainerId, path);
+
+      startContainer(
+          path,
+          container,
+          env,
+          getFormattedCommand(
+              ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+              command,
+              ApplicationConstants.STDOUT,
+              ApplicationConstants.STDERR)
+      );
+
+      if (state.neededContainers.decrementAndGet() == 0) {
+        state.jobHealthy.set(true);
+      }
+      state.runningContainers.put(samzaContainerId, new YarnContainer(container));
+
+      log.info("Claimed container ID {} for container {} on node {} (http://{}/node/containerlogs/{}).",
+          new Object[]{
+              samzaContainerId,
+              containerIdStr,
+              container.getNodeId().getHost(),
+              container.getNodeHttpAddress(),
+              containerIdStr}
+      );
+
+      log.info("Started container ID {}", samzaContainerId);
+  }
+
+  protected void startContainer(Path packagePath,
+                                Container container,
+                                Map<String, String> env,
+                                final String cmd) {
+    log.info("starting container {} {} {} {}",
+        new Object[]{packagePath, container, env, cmd});
+
+    // set the local package so that the containers and app master are provisioned with it
+    LocalResource packageResource = Records.newRecord(LocalResource.class);
+    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
+    FileStatus fileStatus;
+    try {
+      fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
+    } catch (IOException ioe) {
+      log.error("IO Exception when accessing the package status from the filesystem", ioe);
+      throw new SamzaException("IO Exception when accessing the package status from the filesystem");
+    }
+
+    packageResource.setResource(packageUrl);
+    packageResource.setSize(fileStatus.getLen());
+    packageResource.setTimestamp(fileStatus.getModificationTime());
+    packageResource.setType(LocalResourceType.ARCHIVE);
+    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+    ByteBuffer allTokens;
+    // copy tokens (copied from dist shell example)
+    try {
+      Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+      DataOutputBuffer dob = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(dob);
+
+      // now remove the AM->RM token so that containers cannot access it
+      Iterator iter = credentials.getAllTokens().iterator();
+      while (iter.hasNext()) {
+        TokenIdentifier token = (TokenIdentifier) iter.next();
+        if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+          iter.remove();
+        }
+      }
+      allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    } catch (IOException ioe) {
+      ioe.printStackTrace();
+      throw new SamzaException("IO Exception when writing credentials to output buffer");
+    }
+
+    ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
+    context.setEnvironment(env);
+    context.setTokens(allTokens.duplicate());
+    context.setCommands(new ArrayList<String>() {{add(cmd);}});
+    context.setLocalResources(Collections.singletonMap("__package", packageResource));
+
+    log.debug("setting package to {}", packageResource);
+    log.debug("setting context to {}", context);
+
+    StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
+    startContainerRequest.setContainerLaunchContext(context);
+    try {
+      nmClient.startContainer(container, context);
+    } catch (YarnException ye) {
+      log.error("Received YarnException when starting container: " + container.getId(), ye);
+      throw new SamzaException("Received YarnException when starting container: " + container.getId());
+    } catch (IOException ioe) {
+      log.error("Received IOException when starting container: " + container.getId(), ioe);
+      throw new SamzaException("Received IOException when starting container: " + container.getId());
+    }
+  }
+
+  private String getFormattedCommand(String logDirExpansionVar,
+                                     String command,
+                                     String stdOut,
+                                     String stdErr) {
+    return "export SAMZA_LOG_DIR=" + logDirExpansionVar + " && ln -sfn " + logDirExpansionVar +
+        " logs && exec ./__package/" + command + " 1>logs/" + stdOut + " 2>logs/" + stdErr;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
new file mode 100644
index 0000000..e3b5868
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the allocator thread that will be used by SamzaTaskManager when host-affinity is enabled for a job. It is similar to {@link org.apache.samza.job.yarn.ContainerAllocator}, except that it considers container locality for allocation.
+ *
+ * In case of host-affinity, each container request ({@link org.apache.samza.job.yarn.SamzaContainerRequest} encapsulates the identifier of the container to be run and a "preferredHost". preferredHost is determined by the locality mappings in the coordinator stream.
+ * This thread periodically wakes up and makes the best-effort to assign a container to the preferredHost. If the preferredHost is not returned by the RM before the corresponding container expires, the thread assigns the container to any other host that is allocated next.
+ * The container expiry is determined by CONTAINER_REQUEST_TIMEOUT and is configurable on a per-job basis.
+ *
+ * If there aren't enough containers, it waits by sleeping for ALLOCATOR_SLEEP_TIME milliseconds.
+ */
+public class HostAwareContainerAllocator extends AbstractContainerAllocator {
+  private static final Logger log = LoggerFactory.getLogger(HostAwareContainerAllocator.class);
+
+  private final int CONTAINER_REQUEST_TIMEOUT;
+
+  public HostAwareContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
+                                     ContainerUtil containerUtil,
+                                     int allocatorSleepTime,
+                                     int containerRequestTimeout) {
+    super(amClient, containerUtil, allocatorSleepTime, new ContainerRequestState(amClient, true));
+    this.CONTAINER_REQUEST_TIMEOUT = containerRequestTimeout;
+  }
+
+  /**
+   * Since host-affinity is enabled, all allocated container resources are buffered in the list keyed by "preferredHost".
+   *
+   * If the requested host is not available, the thread checks to see if the request has expired.
+   * If it has expired, it runs the container with expectedContainerID on one of the available hosts from the
+   * allocatedContainers buffer keyed by "ANY_HOST".
+   */
+  @Override
+  public void run() {
+    try {
+      while (isRunning.get()) {
+        while (!containerRequestState.getRequestsQueue().isEmpty()) {
+          SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
+          String preferredHost = request.getPreferredHost();
+          int expectedContainerId = request.getExpectedContainerId();
+
+          log.info(
+              "Handling request for container id {} on preferred host {}",
+              expectedContainerId,
+              preferredHost);
+
+          List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(preferredHost);
+          if (allocatedContainers != null && allocatedContainers.size() > 0) {
+            // Found allocated container at preferredHost
+            Container container = allocatedContainers.get(0);
+
+            containerRequestState.updateStateAfterAssignment(request, preferredHost, container);
+
+            log.info("Running {} on {}", expectedContainerId, container.getId());
+            containerUtil.runContainer(expectedContainerId, container);
+          } else {
+            // No allocated container on preferredHost
+            log.info(
+                "Did not find any allocated containers on preferred host {} for running container id {}",
+                preferredHost,
+                expectedContainerId);
+            boolean expired = requestExpired(request);
+            allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
+            if (!expired || allocatedContainers == null || allocatedContainers.size() == 0) {
+              log.info("Either the request timestamp {} is greater than container request timeout {}ms or we couldn't " +
+                      "find any free allocated containers in the buffer. Breaking out of loop.",
+                  request.getRequestTimestamp(),
+                  CONTAINER_REQUEST_TIMEOUT);
+              break;
+            } else {
+              if (allocatedContainers.size() > 0) {
+                Container container = allocatedContainers.get(0);
+                log.info("Found available containers on ANY_HOST. Assigning request for container_id {} with " +
+                        "timestamp {} to container {}",
+                    new Object[] { String.valueOf(expectedContainerId), request.getRequestTimestamp(), container.getId()
+                });
+                containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
+                log.info("Running {} on {}", expectedContainerId, container.getId());
+                containerUtil.runContainer(expectedContainerId, container);
+              }
+            }
+          }
+        }
+        // Release extra containers and update the entire system's state
+        containerRequestState.releaseExtraContainers();
+
+        Thread.sleep(ALLOCATOR_SLEEP_TIME);
+      }
+    } catch (InterruptedException ie) {
+      log.info("Got an InterruptedException in HostAwareContainerAllocator thread!", ie);
+    } catch (Exception e) {
+      log.info("Got an unknown Exception in HostAwareContainerAllocator thread!", e);
+    }
+  }
+
+  private boolean requestExpired(SamzaContainerRequest request) {
+    return System.currentTimeMillis() - request.getRequestTimestamp() > CONTAINER_REQUEST_TIMEOUT;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
new file mode 100644
index 0000000..d5be36e
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.samza.coordinator.JobCoordinator;
+
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SamzaAppState {
+  /**
+   * Job Coordinator is started in the AM and follows the {@link org.apache.samza.job.yarn.SamzaAppMasterService}
+   * lifecycle. It helps querying JobModel related info in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
+   * and locality information when host-affinity is enabled in {@link org.apache.samza.job.yarn.SamzaTaskManager}
+   */
+  public final JobCoordinator jobCoordinator;
+
+  /*  The following state variables are primarily used for reference in the AM web services   */
+  /**
+   * Task Id of the AM
+   * Used for displaying in the AM UI. Usage found in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
+   * and scalate/WEB-INF/views/index.scaml
+   */
+  public final int taskId;
+  /**
+   * Id of the AM container (as allocated by the RM)
+   * Used for displaying in the AM UI. Usage in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
+   * and scalate/WEB-INF/views/index.scaml
+   */
+  public final ContainerId amContainerId;
+  /**
+   * Host name of the NM on which the AM is running
+   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
+   */
+  public final String nodeHost;
+  /**
+   * NM port on which the AM is running
+   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
+   */
+  public final int nodePort;
+  /**
+   * Http port of the NM on which the AM is running
+   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
+   */
+  public final int nodeHttpPort;
+  /**
+   * Application Attempt Id as provided by Yarn
+   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
+   * and {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
+   */
+  public final ApplicationAttemptId appAttemptId;
+  /**
+   * JMX Server URL, if enabled
+   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
+   */
+  public String jmxUrl = "";
+  /**
+   * JMX Server Tunneling URL, if enabled
+   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
+   */
+  public String jmxTunnelingUrl = "";
+  /**
+   * Job Coordinator URL
+   * Usage in {@link org.apache.samza.job.yarn.SamzaAppMasterService} &amp; ContainerUtil
+   */
+  public URL coordinatorUrl = null;
+  /**
+   * URL of the {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
+   */
+  public URL rpcUrl = null;
+  /**
+   * URL of the {@link org.apache.samza.webapp.ApplicationMasterWebServlet}
+   */
+  public URL trackingUrl = null;
+
+  /**
+   * The following state variables are required for the correct functioning of the TaskManager
+   * Some of them are shared between the AMRMCallbackThread and the ContainerAllocator thread, as mentioned below.
+   */
+
+  /**
+   * Number of containers that have completed their execution and exited successfully
+   */
+  public AtomicInteger completedContainers = new AtomicInteger(0);
+
+  /**
+   * Number of failed containers
+   * */
+  public AtomicInteger failedContainers = new AtomicInteger(0);
+
+  /**
+   * Number of containers released due to extra allocation returned by the RM
+   */
+  public AtomicInteger releasedContainers = new AtomicInteger(0);
+
+  /**
+   * Number of containers configured for the job
+   */
+  public int containerCount = 0;
+
+  /**
+   * Set of finished containers - TODO: Can be changed to a counter
+   */
+  public Set<Integer> finishedContainers = new HashSet<Integer>();
+
+  /**
+   *  Number of containers needed for the job to be declared healthy
+   *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
+   */
+  public AtomicInteger neededContainers = new AtomicInteger(0);
+
+  /**
+   *  Map of the samzaContainerId to the {@link org.apache.samza.job.yarn.YarnContainer} on which it is running
+   *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
+   */
+  public ConcurrentMap<Integer, YarnContainer> runningContainers = new ConcurrentHashMap<Integer, YarnContainer>(0);
+
+  /**
+   * Final status of the application
+   * Modified by both the AMRMCallbackThread and the ContainerAllocator thread
+   */
+  public FinalApplicationStatus status = FinalApplicationStatus.UNDEFINED;
+
+  /**
+   * State indicating whether the job is healthy or not
+   * Modified by both the AMRMCallbackThread and the ContainerAllocator thread
+   */
+  public AtomicBoolean jobHealthy = new AtomicBoolean(true);
+
+  public SamzaAppState(JobCoordinator jobCoordinator,
+                       int taskId,
+                       ContainerId amContainerId,
+                       String nodeHost,
+                       int nodePort,
+                       int nodeHttpPort) {
+    this.jobCoordinator = jobCoordinator;
+    this.taskId = taskId;
+    this.amContainerId = amContainerId;
+    this.nodeHost = nodeHost;
+    this.nodePort = nodePort;
+    this.nodeHttpPort = nodeHttpPort;
+    this.appAttemptId = amContainerId.getApplicationAttemptId();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
new file mode 100644
index 0000000..9441d77
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+
+/**
+ * SamzaContainerRequest encapsulate the ContainerRequest and the meta-information of the request.
+ */
+public class SamzaContainerRequest implements Comparable<SamzaContainerRequest> {
+  private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
+
+  private Priority priority;
+  private Resource capability;
+
+  /**
+   *  If host-affinity is enabled, the request specifies a preferred host for the container
+   *  If not, preferredHost defaults to ANY_HOST
+   */
+  private String preferredHost;
+  // Timestamp at which the request is issued. Used to check request expiry
+  private Long requestTimestamp;
+  // Actual Container Request that is issued to the RM
+  public AMRMClient.ContainerRequest issuedRequest;
+  // Container Id that is expected to run on the container returned for this request
+  public int expectedContainerId;
+
+  public SamzaContainerRequest(int memoryMb, int cpuCores, int priority, int expectedContainerId, String preferredHost) {
+    this.capability = Resource.newInstance(memoryMb, cpuCores);
+    this.priority = Priority.newInstance(priority);
+    this.expectedContainerId = expectedContainerId;
+    if (preferredHost == null) {
+      this.preferredHost = ANY_HOST;
+      this.issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, this.priority);
+    } else {
+      this.preferredHost = preferredHost;
+      this.issuedRequest = new AMRMClient.ContainerRequest(
+          capability,
+          new String[]{this.preferredHost},
+          null,
+          this.priority);
+    }
+
+    this.requestTimestamp = System.currentTimeMillis();
+  }
+
+  public SamzaContainerRequest(int expectedContainerId, String preferredHost) {
+    this(
+        AbstractContainerAllocator.DEFAULT_CONTAINER_MEM,
+        AbstractContainerAllocator.DEFAULT_CPU_CORES,
+        AbstractContainerAllocator.DEFAULT_PRIORITY,
+        expectedContainerId,
+        preferredHost);
+  }
+
+  public Resource getCapability() {
+    return capability;
+  }
+
+  public Priority getPriority() {
+    return priority;
+  }
+
+  public AMRMClient.ContainerRequest getIssuedRequest() {
+    return issuedRequest;
+  }
+
+  public int getExpectedContainerId() {
+    return expectedContainerId;
+  }
+
+  public String getPreferredHost() {
+    return preferredHost;
+  }
+
+  public Long getRequestTimestamp() {
+    return requestTimestamp;
+  }
+
+  @Override
+  public String toString() {
+    return "[requestIssueTime=" + issuedRequest.toString() + "]";
+  }
+
+  @Override
+  public int compareTo(SamzaContainerRequest o) {
+    if(requestTimestamp < o.requestTimestamp)
+      return -1;
+    if(requestTimestamp > o.requestTimestamp)
+      return 1;
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c76132ff/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
new file mode 100644
index 0000000..c2a9ac0
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.YarnConfig;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static junit.framework.Assert.assertTrue;
+
+/**
+ * Samza's application master is mostly interested in requesting containers to
+ * run Samza jobs. SamzaTaskManager is responsible for requesting new
+ * containers, handling failures, and notifying the application master that the
+ * job is done.
+ *
+ * The following main threads are involved in the execution of the Samza AM:
+ *  - The main thread (defined in SamzaAppMaster) that drive the AM to send out container requests to RM
+ *  - The callback handler thread that receives the responses from RM and handles:
+ *      - Populating a buffer when a container is allocated by the RM
+ *        (allocatedContainers in {@link org.apache.samza.job.yarn.ContainerRequestState}
+ *      - Identifying the cause of container failure & re-request containers from RM by adding request to the
+ *        internal requestQueue in {@link org.apache.samza.job.yarn.ContainerRequestState}
+ *  - The allocator thread defined here assigns the allocated containers to pending requests
+ *    (See {@link org.apache.samza.job.yarn.ContainerAllocator} or {@link org.apache.samza.job.yarn.HostAwareContainerAllocator})
+ */
+
+class SamzaTaskManager implements YarnAppMasterListener {
+  private static final Logger log = LoggerFactory.getLogger(SamzaTaskManager.class);
+
+  private final boolean hostAffinityEnabled;
+  private final SamzaAppState state;
+
+  // Derived configs
+  private final JobConfig jobConfig;
+  private final YarnConfig yarnConfig;
+
+  private final AbstractContainerAllocator containerAllocator;
+  private final Thread allocatorThread;
+
+  // State
+  private boolean tooManyFailedContainers = false;
+  private Map<Integer, ContainerFailure> containerFailures = new HashMap<Integer, ContainerFailure>();
+
+  public SamzaTaskManager(Config config,
+                          SamzaAppState state,
+                          AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
+                          YarnConfiguration conf) {
+    this.state = state;
+    this.jobConfig = new JobConfig(config);
+    this.yarnConfig = new YarnConfig(config);
+
+    this.hostAffinityEnabled = yarnConfig.getHostAffinityEnabled();
+
+    if (this.hostAffinityEnabled) {
+      this.containerAllocator = new HostAwareContainerAllocator(
+          amClient,
+          new ContainerUtil(config, state, conf),
+          yarnConfig.getAllocatorSleepTime(),
+          yarnConfig.getContainerRequestTimeout()
+      );
+    } else {
+      this.containerAllocator = new ContainerAllocator(
+          amClient,
+          new ContainerUtil(config, state, conf),
+          yarnConfig.getAllocatorSleepTime());
+    }
+
+    this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
+  }
+
+  @Override
+  public boolean shouldShutdown() {
+    return tooManyFailedContainers || state.completedContainers.get() == state.containerCount || !allocatorThread.isAlive();
+  }
+
+  @Override
+  public void onInit() {
+    state.containerCount = jobConfig.getContainerCount();
+
+    state.neededContainers.set(state.containerCount);
+
+    // Request initial set of containers
+    Map<Integer, String> containerToHostMapping = state.jobCoordinator.jobModel().getAllContainerLocality();
+
+    assertTrue(containerToHostMapping.size() == state.containerCount);
+
+    containerAllocator.requestContainers(containerToHostMapping);
+
+    // Start container allocator thread
+    log.info("Starting the container allocator thread");
+    allocatorThread.start();
+  }
+
+  @Override
+  public void onReboot() {
+
+  }
+
+  @Override
+  public void onShutdown() {
+    // Shutdown allocator thread
+    containerAllocator.setIsRunning(false);
+    try {
+      allocatorThread.join();
+    } catch (InterruptedException ie) {
+      log.info("Allocator Thread join() threw an interrupted exception", ie);
+      // Should we throw exception here??
+    }
+  }
+
+  @Override
+  public void onContainerAllocated(Container container) {
+    containerAllocator.addContainer(container);
+  }
+
+  /**
+   * This methods handles the onContainerCompleted callback from the RM. Based on the ContainerExitStatus, it decides
+   * whether a container that exited is marked as complete or failure.
+   */
+  @Override
+  public void onContainerCompleted(ContainerStatus containerStatus) {
+    String containerIdStr = ConverterUtils.toString(containerStatus.getContainerId());
+    int containerId = -1;
+    for(Map.Entry<Integer, YarnContainer> entry: state.runningContainers.entrySet()) {
+      if(entry.getValue().id().equals(containerStatus.getContainerId())) {
+        containerId = entry.getKey();
+        break;
+      }
+    }
+    state.runningContainers.remove(containerId);
+
+    int exitStatus = containerStatus.getExitStatus();
+    switch(exitStatus) {
+      case ContainerExitStatus.SUCCESS:
+        log.info("Container {} completed successfully.", containerIdStr);
+
+        state.completedContainers.incrementAndGet();
+
+        if (containerId != -1) {
+          state.finishedContainers.add(containerId);
+          containerFailures.remove(containerId);
+        }
+
+        if (state.completedContainers.get() == state.containerCount) {
+          log.info("Setting job status to SUCCEEDED, since all containers have been marked as completed.");
+          state.status = FinalApplicationStatus.SUCCEEDED;
+        }
+        break;
+
+      case ContainerExitStatus.DISKS_FAILED:
+      case ContainerExitStatus.ABORTED:
+      case ContainerExitStatus.PREEMPTED:
+        log.info("Got an exit code of {}. This means that container {} was "
+            + "killed by YARN, either due to being released by the application "
+            + "master or being 'lost' due to node failures etc. or due to preemption by the RM",
+            exitStatus,
+            containerIdStr);
+
+        state.releasedContainers.incrementAndGet();
+
+        // If this container was assigned some partitions (a containerId), then
+        // clean up, and request a new container for the tasks. This only
+        // should happen if the container was 'lost' due to node failure, not
+        // if the AM released the container.
+        if (containerId != -1) {
+          log.info("Released container {} was assigned task group ID {}. Requesting a new container for the task group.", containerIdStr, containerId);
+
+          state.neededContainers.incrementAndGet();
+          state.jobHealthy.set(false);
+
+          // request a container on new host
+          containerAllocator.requestContainer(containerId, ContainerAllocator.ANY_HOST);
+        }
+        break;
+
+      default:
+        // TODO: Handle failure more intelligently. Should track NodeFailures!
+        log.info("Container failed for some reason. Let's start it again");
+        log.info("Container " + containerIdStr + " failed with exit code " + exitStatus + " - " + containerStatus.getDiagnostics());
+
+        state.failedContainers.incrementAndGet();
+        state.jobHealthy.set(false);
+
+        if(containerId != -1) {
+          state.neededContainers.incrementAndGet();
+          // Find out previously running container location
+          String lastSeenOn = state.jobCoordinator.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);
+          if (!hostAffinityEnabled || lastSeenOn == null) {
+            lastSeenOn = ContainerAllocator.ANY_HOST;
+          }
+          // A container failed for an unknown reason. Let's check to see if
+          // we need to shutdown the whole app master if too many container
+          // failures have happened. The rules for failing are that the
+          // failure count for a task group id must be > the configured retry
+          // count, and the last failure (the one prior to this one) must have
+          // happened less than retry window ms ago. If retry count is set to
+          // 0, the app master will fail on any container failure. If the
+          // retry count is set to a number < 0, a container failure will
+          // never trigger an app master failure.
+          int retryCount = yarnConfig.getContainerRetryCount();
+          int retryWindowMs = yarnConfig.getContainerRetryWindowMs();
+
+          if (retryCount == 0) {
+            log.error("Container ID {} ({}) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed.", containerId, containerIdStr);
+
+            tooManyFailedContainers = true;
+          } else if (retryCount > 0) {
+            int currentFailCount;
+            long lastFailureTime;
+            if(containerFailures.containsKey(containerId)) {
+              ContainerFailure failure = containerFailures.get(containerId);
+              currentFailCount = failure.getCount() + 1;
+              lastFailureTime = failure.getLastFailure();
+              } else {
+              currentFailCount = 1;
+              lastFailureTime = 0L;
+            }
+            if (currentFailCount >= retryCount) {
+              long lastFailureMsDiff = System.currentTimeMillis() - lastFailureTime;
+
+              if (lastFailureMsDiff < retryWindowMs) {
+                log.error("Container ID " + containerId + "(" + containerIdStr + ") has failed " + currentFailCount +
+                    " times, with last failure " + lastFailureMsDiff + "ms ago. This is greater than retry count of " +
+                    retryCount + " and window of " + retryWindowMs + "ms , so shutting down the application master, and marking the job as failed.");
+
+                // We have too many failures, and we're within the window
+                // boundary, so reset shut down the app master.
+                tooManyFailedContainers = true;
+                state.status = FinalApplicationStatus.FAILED;
+              } else {
+                log.info("Resetting fail count for container ID {} back to 1, since last container failure ({}) for " +
+                    "this container ID was outside the bounds of the retry window.", containerId, containerIdStr);
+
+                // Reset counter back to 1, since the last failure for this
+                // container happened outside the window boundary.
+                containerFailures.put(containerId, new ContainerFailure(1, System.currentTimeMillis()));
+              }
+            } else {
+              log.info("Current fail count for container ID {} is {}.", containerId, currentFailCount);
+              containerFailures.put(containerId, new ContainerFailure(currentFailCount, System.currentTimeMillis()));
+            }
+          }
+
+          if (!tooManyFailedContainers) {
+            // Request a new container
+            containerAllocator.requestContainer(containerId, lastSeenOn);
+          }
+        }
+
+    }
+  }
+}