You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/07/12 18:28:23 UTC

[3/3] samza git commit: SAMZA-903: Refactor UI state variables

SAMZA-903: Refactor UI state variables


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9396ee5c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9396ee5c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9396ee5c

Branch: refs/heads/master
Commit: 9396ee5cc0a35e4e32844547eacebb24ae971c67
Parents: 920f803
Author: Jagadish Venkatraman <ja...@gmail.com>
Authored: Mon Jul 11 17:10:03 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Jul 12 10:08:51 2016 -0700

----------------------------------------------------------------------
 .../AbstractContainerAllocator.java             |   2 +-
 .../clustermanager/ContainerProcessManager.java |   8 +-
 .../clustermanager/SamzaApplicationState.java   |   8 +-
 .../ContainerProcessManagerMetrics.scala        |   2 +-
 .../clustermanager/TestContainerAllocator.java  |   4 +-
 .../TestContainerProcessManager.java            |   2 +-
 .../TestHostAwareContainerAllocator.java        |   4 +-
 samza-shell/src/main/bash/run-am.sh             |  25 -
 .../job/yarn/AbstractContainerAllocator.java    | 227 ---------
 .../samza/job/yarn/ContainerAllocator.java      |  54 --
 .../apache/samza/job/yarn/ContainerFailure.java |  48 --
 .../samza/job/yarn/ContainerRequestState.java   | 283 -----------
 .../apache/samza/job/yarn/ContainerUtil.java    | 267 ----------
 .../job/yarn/HostAwareContainerAllocator.java   |  89 ----
 .../apache/samza/job/yarn/SamzaAppState.java    | 212 --------
 .../samza/job/yarn/SamzaContainerRequest.java   | 113 -----
 .../apache/samza/job/yarn/SamzaTaskManager.java | 285 -----------
 .../org/apache/samza/job/yarn/YarnAppState.java |  15 +-
 .../job/yarn/YarnClusterResourceManager.java    |  38 +-
 .../resources/scalate/WEB-INF/views/index.scaml |  29 +-
 .../apache/samza/job/yarn/SamzaAppMaster.scala  | 172 -------
 .../job/yarn/SamzaAppMasterLifecycle.scala      |  67 ---
 .../samza/job/yarn/SamzaAppMasterMetrics.scala  |  96 ----
 .../samza/job/yarn/SamzaAppMasterService.scala  | 101 ----
 .../job/yarn/SamzaYarnAppMasterLifecycle.scala  |   6 +-
 .../job/yarn/SamzaYarnAppMasterService.scala    |  46 +-
 .../org/apache/samza/job/yarn/YarnJob.scala     |   4 +-
 .../webapp/ApplicationMasterRestServlet.scala   |   9 +-
 .../webapp/ApplicationMasterWebServlet.scala    |   6 +-
 .../samza/job/yarn/TestContainerAllocator.java  | 137 -----
 .../job/yarn/TestContainerAllocatorCommon.java  | 225 ---------
 .../job/yarn/TestContainerRequestState.java     | 221 --------
 .../yarn/TestHostAwareContainerAllocator.java   | 253 ----------
 .../job/yarn/TestSamzaContainerRequest.java     |  52 --
 .../samza/job/yarn/TestSamzaTaskManager.java    | 502 -------------------
 .../job/yarn/util/MockContainerAllocator.java   |  53 --
 .../yarn/util/MockContainerRequestState.java    |  90 ----
 .../samza/job/yarn/util/MockContainerUtil.java  |  79 ---
 .../apache/samza/job/yarn/util/TestUtil.java    | 263 ----------
 .../apache/samza/job/yarn/MockSystemAdmin.scala |  54 ++
 .../samza/job/yarn/MockSystemFactory.scala      |  43 ++
 .../samza/job/yarn/TestSamzaAppMaster.scala     | 223 --------
 .../job/yarn/TestSamzaAppMasterLifecycle.scala  | 128 -----
 .../job/yarn/TestSamzaAppMasterService.scala    | 153 ------
 .../yarn/TestSamzaYarnAppMasterLifecycle.scala  | 142 ++++++
 .../yarn/TestSamzaYarnAppMasterService.scala    | 121 +++++
 46 files changed, 475 insertions(+), 4486 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index 097a476..d47f217 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -155,7 +155,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
       //launches a StreamProcessor on the resource
       clusterResourceManager.launchStreamProcessor(resource, builder);
 
-      if (state.neededResources.decrementAndGet() == 0) {
+      if (state.neededContainers.decrementAndGet() == 0) {
         state.jobHealthy.set(true);
       }
       state.runningContainers.put(request.getContainerID(), resource);

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index f2db34c..c6bfec0 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -169,7 +169,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     final int containerCount = jobConfig.getContainerCount();
 
     state.containerCount.set(containerCount);
-    state.neededResources.set(containerCount);
+    state.neededContainers.set(containerCount);
 
     // Request initial set of containers
     Map<Integer, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
@@ -249,7 +249,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         state.completedContainers.incrementAndGet();
 
         if (containerId != -1) {
-          state.finishedContainers.add(containerId);
+          state.finishedContainers.incrementAndGet();
           containerFailures.remove(containerId);
         }
 
@@ -277,7 +277,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         if (containerId != -1) {
           log.info("Released container {} was assigned task group ID {}. Requesting a refactor container for the task group.", containerIdStr, containerId);
 
-          state.neededResources.incrementAndGet();
+          state.neededContainers.incrementAndGet();
           state.jobHealthy.set(false);
 
           // request a container on refactor host
@@ -295,7 +295,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         state.jobHealthy.set(false);
 
         if (containerId != -1) {
-          state.neededResources.incrementAndGet();
+          state.neededContainers.incrementAndGet();
           // Find out previously running container location
           String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);
           if (!hostAffinityEnabled || lastSeenOn == null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index ca277b3..cf91044 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -21,8 +21,6 @@ package org.apache.samza.clustermanager;
 
 import org.apache.samza.coordinator.JobModelManager;
 
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -87,15 +85,15 @@ public class SamzaApplicationState {
   public final AtomicInteger containerCount = new AtomicInteger(0);
 
   /**
-   * Set of finished containers - TODO: Can be changed to a counter
+   * Set of finished containers
    */
-  public final Set<Integer> finishedContainers = new HashSet<Integer>();
+  public final AtomicInteger finishedContainers = new AtomicInteger(0);
 
   /**
    *  Number of containers needed for the job to be declared healthy
    *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
    */
-  public final AtomicInteger neededResources = new AtomicInteger(0);
+  public final AtomicInteger neededContainers = new AtomicInteger(0);
 
   /**
    *  Map of the samzaContainerId to the {@link SamzaResource} on which it is running

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 86c2440..f24beb1 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -56,7 +56,7 @@ class ContainerProcessManagerMetrics(
 
    def start() {
     val mRunningContainers = newGauge("running-containers", () => state.runningContainers.size)
-    val mNeededContainers = newGauge("needed-containers", () => state.neededResources.get())
+    val mNeededContainers = newGauge("needed-containers", () => state.neededContainers.get())
     val mCompletedContainers = newGauge("completed-containers", () => state.completedContainers.get())
     val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get())
     val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get())

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index f147570..5351bc3 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -254,11 +254,11 @@ public class TestContainerAllocator {
 
             // This routine should be called after the retry is assigned, but before it's started.
             // So there should still be 1 container needed.
-            assertEquals(1, state.neededResources.get());
+            assertEquals(1, state.neededContainers.get());
           }
         }, null
     );
-    state.neededResources.set(1);
+    state.neededContainers.set(1);
     requestState.registerContainerListener(listener);
 
     containerAllocator.requestResource(0, "2");

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 4fd1018..57a5da6 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -192,7 +192,7 @@ public class TestContainerProcessManager {
     assertTrue(isRunning);
 
     // Verify the remaining state
-    assertEquals(1, state.neededResources.get());
+    assertEquals(1, state.neededContainers.get());
     assertEquals(1, allocator.requestedContainers);
 
     taskManager.stop();

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index 57fef12..b6651f2 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -230,11 +230,11 @@ public class TestHostAwareContainerAllocator {
 
             // This routine should be called after the retry is assigned, but before it's started.
             // So there should still be 1 container needed.
-            assertEquals(1, state.neededResources.get());
+            assertEquals(1, state.neededContainers.get());
           }
         }, null
     );
-    state.neededResources.set(1);
+    state.neededContainers.set(1);
     requestState.registerContainerListener(listener);
 
     // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-shell/src/main/bash/run-am.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-am.sh b/samza-shell/src/main/bash/run-am.sh
deleted file mode 100755
index ca938cc..0000000
--- a/samza-shell/src/main/bash/run-am.sh
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Check if server is set. If not - set server optimization
-[[ $JAVA_OPTS != *-server* ]] && export JAVA_OPTS="$JAVA_OPTS -server"
-
-# Set container name system properties for use in Log4J
-[[ $JAVA_OPTS != *-Dsamza.container.name* ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.name=samza-application-master"
-
-exec $(dirname $0)/run-class.sh org.apache.samza.job.yarn.SamzaAppMaster "$@"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
deleted file mode 100644
index b4789e6..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import java.util.List;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.YarnConfig;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This class is responsible for making requests for containers to the AM and also, assigning a container to run on an allocated resource.
- *
- * Since we are using a simple thread based allocation of a container to an allocated resource, the subclasses should implement {@link java.lang.Runnable} interface.
- * The allocator thread follows the lifecycle of the {@link org.apache.samza.job.yarn.SamzaTaskManager}. Depending on whether host-affinity is enabled or not, the allocation model varies.
- *
- * See {@link org.apache.samza.job.yarn.ContainerAllocator} and {@link org.apache.samza.job.yarn.HostAwareContainerAllocator}
- */
-public abstract class AbstractContainerAllocator implements Runnable {
-  private static final Logger log = LoggerFactory.getLogger(AbstractContainerAllocator.class);
-
-  public static final String ANY_HOST = ContainerRequestState.ANY_HOST;
-  public static final int DEFAULT_PRIORITY = 0;
-  public static final int DEFAULT_CONTAINER_MEM = 1024;
-  public static final int DEFAULT_CPU_CORES = 1;
-
-  protected final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
-  protected final int ALLOCATOR_SLEEP_TIME;
-  protected final ContainerUtil containerUtil;
-  protected final int containerMaxMemoryMb;
-  protected final int containerMaxCpuCore;
-
-  // containerRequestState indicate the state of all unfulfilled container requests and allocated containers
-  private final ContainerRequestState containerRequestState;
-
-  // state that controls the lifecycle of the allocator thread
-  private AtomicBoolean isRunning = new AtomicBoolean(true);
-
-  public AbstractContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
-                            ContainerUtil containerUtil,
-                            ContainerRequestState containerRequestState,
-                            YarnConfig yarnConfig) {
-    this.amClient = amClient;
-    this.containerUtil = containerUtil;
-    this.ALLOCATOR_SLEEP_TIME = yarnConfig.getAllocatorSleepTime();
-    this.containerRequestState = containerRequestState;
-    this.containerMaxMemoryMb = yarnConfig.getContainerMaxMemoryMb();
-    this.containerMaxCpuCore = yarnConfig.getContainerMaxCpuCores();
-  }
-
-  /**
-   * Continuously assigns requested containers to the allocated containers provided by the cluster manager.
-   * The loop frequency is governed by thread sleeps for ALLOCATOR_SLEEP_TIME ms.
-   *
-   * Terminates when the isRunning flag is cleared.
-   */
-  @Override
-  public void run() {
-    while(isRunning.get()) {
-      try {
-        assignContainerRequests();
-
-        // Release extra containers and update the entire system's state
-        containerRequestState.releaseExtraContainers();
-
-        Thread.sleep(ALLOCATOR_SLEEP_TIME);
-      } catch (InterruptedException e) {
-        log.info("Got InterruptedException in AllocatorThread.", e);
-      } catch (Exception e) {
-        log.error("Got unknown Exception in AllocatorThread.", e);
-      }
-    }
-  }
-
-  /**
-   * Assigns the container requests from the queue to the allocated containers from the cluster manager and
-   * runs them.
-   */
-  protected abstract void assignContainerRequests();
-
-  /**
-   * Updates the request state and runs the container on the specified host. Assumes a container
-   * is available on the preferred host, so the caller must verify that before invoking this method.
-   *
-   * @param request             the {@link SamzaContainerRequest} which is being handled.
-   * @param preferredHost       the preferred host on which the container should be run or
-   *                            {@link ContainerRequestState#ANY_HOST} if there is no host preference.
-   */
-  protected void runContainer(SamzaContainerRequest request, String preferredHost) {
-    // Get the available container
-    Container container = peekAllocatedContainer(preferredHost);
-    if (container == null)
-      throw new SamzaException("Expected container was unavailable on host " + preferredHost);
-
-    // Update state
-    containerRequestState.updateStateAfterAssignment(request, preferredHost, container);
-    int expectedContainerId = request.expectedContainerId;
-
-    // Cancel request and run container
-    log.info("Found available containers on {}. Assigning request for container_id {} with "
-            + "timestamp {} to container {}",
-        new Object[]{preferredHost, String.valueOf(expectedContainerId), request.getRequestTimestamp(), container.getId()});
-    try {
-      if (preferredHost.equals(ANY_HOST)) {
-        containerUtil.runContainer(expectedContainerId, container);
-      } else {
-        containerUtil.runMatchedContainer(expectedContainerId, container);
-      }
-    } catch (SamzaContainerLaunchException e) {
-      log.warn(String.format("Got exception while starting container %s. Requesting a new container on any host", container), e);
-      containerRequestState.releaseUnstartableContainer(container);
-      requestContainer(expectedContainerId, ContainerAllocator.ANY_HOST);
-    }
-  }
-
-  /**
-   * Called during initial request for containers
-   *
-   * @param containerToHostMappings Map of containerId to its last seen host (locality).
-   *                                The locality value is null, either
-   *                                - when host-affinity is not enabled, or
-   *                                - when host-affinity is enabled and job is run for the first time
-   */
-  public void requestContainers(Map<Integer, String> containerToHostMappings) {
-    for (Map.Entry<Integer, String> entry : containerToHostMappings.entrySet()) {
-      int containerId = entry.getKey();
-      String preferredHost = entry.getValue();
-      if (preferredHost == null)
-        preferredHost = ANY_HOST;
-
-      requestContainer(containerId, preferredHost);
-    }
-  }
-  /**
-   * Method to request a container resource from yarn
-   *
-   * @param expectedContainerId Identifier of the container that will be run when a container resource is allocated for
-   *                            this request
-   * @param preferredHost Name of the host that you prefer to run the container on
-   */
-  public final void requestContainer(int expectedContainerId, String preferredHost) {
-    SamzaContainerRequest request = new SamzaContainerRequest(
-        containerMaxMemoryMb,
-        containerMaxCpuCore,
-        DEFAULT_PRIORITY,
-        expectedContainerId,
-        preferredHost);
-    containerRequestState.updateRequestState(request);
-    containerUtil.incrementContainerRequests();
-  }
-
-  /**
-   * @return {@code true} if there is a pending request, {@code false} otherwise.
-   */
-  protected boolean hasPendingRequest() {
-    return peekPendingRequest() != null;
-  }
-
-  /**
-   * Retrieves, but does not remove, the next pending request in the queue.
-   *
-   * @return  the pending request or {@code null} if there is no pending request.
-   */
-  protected SamzaContainerRequest peekPendingRequest() {
-    return containerRequestState.getRequestsQueue().peek();
-  }
-
-  /**
-   * Method that adds allocated container to a synchronized buffer of allocated containers list
-   * See allocatedContainers in {@link org.apache.samza.job.yarn.ContainerRequestState}
-   *
-   * @param container Container resource returned by the RM
-   */
-  public final void addContainer(Container container) {
-    containerRequestState.addContainer(container);
-  }
-
-  /**
-   * @param host  the host for which a container is needed.
-   * @return      {@code true} if there is a container allocated for the specified host, {@code false} otherwise.
-   */
-  protected boolean hasAllocatedContainer(String host) {
-    return peekAllocatedContainer(host) != null;
-  }
-
-  /**
-   * Retrieves, but does not remove, the first allocated container on the specified host.
-   *
-   * @param host  the host for which a container is needed.
-   * @return      the first {@link Container} allocated for the specified host or {@code null} if there isn't one.
-   */
-  protected Container peekAllocatedContainer(String host) {
-    List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(host);
-    if (allocatedContainers == null || allocatedContainers.isEmpty()) {
-      return null;
-    }
-
-    return allocatedContainers.get(0);
-  }
-
-  public final void setIsRunning(boolean state) {
-    isRunning.set(state);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
deleted file mode 100644
index 24ac410..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import java.util.List;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.config.YarnConfig;
-
-/**
- * This is the default allocator thread that will be used by SamzaTaskManager.
- *
- * When host-affinity is not enabled, this thread periodically wakes up to assign a container to an allocated resource.
- * If there aren't enough containers, it waits by sleeping for {@code ALLOCATOR_SLEEP_TIME} milliseconds.
- */
-public class ContainerAllocator extends AbstractContainerAllocator {
-  public ContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
-                            ContainerUtil containerUtil,
-                            YarnConfig yarnConfig) {
-    super(amClient, containerUtil, new ContainerRequestState(amClient, false), yarnConfig);
-  }
-
-  /**
-   * This method tries to allocate any unsatisfied request that is still in the request queue
-   * (See requests in {@link org.apache.samza.job.yarn.ContainerRequestState})
-   * with allocated containers, if any.
-   *
-   * Since host-affinity is not enabled, all allocated container resources are buffered in the list keyed by "ANY_HOST".
-   * */
-  @Override
-  public void assignContainerRequests() {
-    while (hasPendingRequest() && hasAllocatedContainer(ANY_HOST)) {
-      SamzaContainerRequest request = peekPendingRequest();
-      runContainer(request, ANY_HOST);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java
deleted file mode 100644
index 1d15651..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn;
-
-/**
- * Class that encapsulates information related to a container failure
- * */
-public class ContainerFailure {
-  /**
-   * Number of times a container has failed
-   * */
-  private int count;
-  /**
-   * Latest failure time of the container
-   * */
-  private Long lastFailure;
-
-  public ContainerFailure(int count,
-                          Long lastFailure) {
-    this.count = count;
-    this.lastFailure = lastFailure;
-  }
-
-  public int getCount() {
-    return count;
-  }
-
-  public Long getLastFailure() {
-    return lastFailure;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
deleted file mode 100644
index 57ce350..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class maintains the state variables for all the container requests and the allocated containers returned
- * by the RM
- * Important: Even though we use concurrent data structures, this class is not thread-safe. Thread safety has to be
- * handled by the caller.
- */
-public class ContainerRequestState {
-  private static final Logger log = LoggerFactory.getLogger(ContainerRequestState.class);
-  public static final String ANY_HOST = "ANY_HOST";
-
-  /**
-   * Maintain a map of hostname to a list of containers allocated on this host
-   */
-  private final ConcurrentHashMap<String, List<Container>> allocatedContainers = new ConcurrentHashMap<String, List<Container>>();
-  /**
-   * Represents the queue of container requests made by the {@link org.apache.samza.job.yarn.SamzaTaskManager}
-   */
-  private final PriorityBlockingQueue<SamzaContainerRequest> requestsQueue = new PriorityBlockingQueue<SamzaContainerRequest>();
-  /**
-   * Maintain a map of hostname to the number of requests made for containers on this host
-   * This state variable is used to look-up whether an allocated container on a host was ever requested in the past.
-   * This map is not updated when host-affinity is not enabled
-   */
-  private final ConcurrentHashMap<String, AtomicInteger> requestsToCountMap = new ConcurrentHashMap<String, AtomicInteger>();
-  /**
-   * Indicates whether host-affinity is enabled or not
-   */
-  private final boolean hostAffinityEnabled;
-
-  private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
-
-  // TODO: Refactor such that the state class for host-affinity enabled allocator is a subclass of a generic state class
-  public ContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
-                               boolean hostAffinityEnabled) {
-    this.amClient = amClient;
-    this.hostAffinityEnabled = hostAffinityEnabled;
-  }
-
-  /**
-   * This method is called every time {@link org.apache.samza.job.yarn.SamzaTaskManager} requestsQueue for a container
-   * Adds {@link org.apache.samza.job.yarn.SamzaContainerRequest} to the requestsQueue queue.
-   * If host-affinity is enabled, it updates the requestsToCountMap as well.
-   *
-   * @param request {@link org.apache.samza.job.yarn.SamzaContainerRequest} that was sent to the RM
-   */
-  public synchronized void updateRequestState(SamzaContainerRequest request) {
-
-    log.info("Requesting a container for {} at {}", request.getExpectedContainerId(), request.getPreferredHost());
-    amClient.addContainerRequest(request.getIssuedRequest());
-
-    requestsQueue.add(request);
-    String preferredHost = request.getPreferredHost();
-    if (hostAffinityEnabled) {
-      if (requestsToCountMap.containsKey(preferredHost)) {
-        requestsToCountMap.get(preferredHost).incrementAndGet();
-      } else {
-        requestsToCountMap.put(preferredHost, new AtomicInteger(1));
-      }
-      /**
-       * The following is important to correlate allocated container data with the requestsQueue made before. If
-       * the preferredHost is requested for the first time, the state should reflect that the allocatedContainers
-       * list is empty and NOT null.
-       */
-      if (!allocatedContainers.containsKey(preferredHost)) {
-        allocatedContainers.put(preferredHost, new ArrayList<Container>());
-      }
-    }
-  }
-
-  /**
-   * This method is called every time the RM returns an allocated container.
-   * Adds the allocated container resource to the correct allocatedContainers buffer
-   * @param container Container resource that was returned by the RM
-   */
-  public synchronized void addContainer(Container container) {
-    if(hostAffinityEnabled) {
-      String hostName = container.getNodeHttpAddress().split(":")[0];
-      AtomicInteger requestCount = requestsToCountMap.get(hostName);
-      // Check if this host was requested for any of the containers
-      if (requestCount == null || requestCount.get() == 0) {
-        log.debug(
-            "Request count for the allocatedContainer on {} is null or 0. This means that the host was not requested " +
-                "for running containers.Hence, saving the container {} in the buffer for ANY_HOST",
-            hostName,
-            container.getId()
-        );
-        addToAllocatedContainerList(ANY_HOST, container);
-      } else {
-        int requestCountOnThisHost = requestCount.get();
-        List<Container> allocatedContainersOnThisHost = allocatedContainers.get(hostName);
-        if (requestCountOnThisHost > 0) {
-          if (allocatedContainersOnThisHost == null || allocatedContainersOnThisHost.size() < requestCountOnThisHost) {
-            log.debug("Saving the container {} in the buffer for {}", container.getId(), hostName);
-            addToAllocatedContainerList(hostName, container);
-          }
-          else {
-              /**
-               * The RM may allocate more containers on a given host than requested. In such a case, even though the
-               * requestCount != 0, it will be greater than the total request count for that host. Hence, it should be
-               * assigned to ANY_HOST
-               */
-              log.debug(
-                  "The number of containers already allocated on {} is greater than what was " +
-                      "requested, which is {}. Hence, saving the container {} in the buffer for ANY_HOST",
-                  new Object[]{
-                      hostName,
-                      requestCountOnThisHost,
-                      container.getId()
-                  }
-              );
-              addToAllocatedContainerList(ANY_HOST, container);
-            }
-          }
-        }
-      }
-     else {
-      log.debug("Saving the container {} in the buffer for ANY_HOST", container.getId());
-      addToAllocatedContainerList(ANY_HOST, container);
-    }
-  }
-
-  // Update the allocatedContainers list
-  private void addToAllocatedContainerList(String host, Container container) {
-    List<Container> list = allocatedContainers.get(host);
-    if (list != null) {
-      list.add(container);
-    } else {
-      list = new ArrayList<Container>();
-      list.add(container);
-      allocatedContainers.put(host, list);
-    }
-  }
-
-  /**
-   * This method updates the state after a request is fulfilled and a container starts running on a host
-   * Needs to be synchronized because the state buffers are populated by the AMRMCallbackHandler, whereas it is drained by the allocator thread
-   *
-   * @param request {@link org.apache.samza.job.yarn.SamzaContainerRequest} that was fulfilled
-   * @param assignedHost  Host to which the container was assigned
-   * @param container Allocated container resource that was used to satisfy this request
-   */
-  public synchronized void updateStateAfterAssignment(SamzaContainerRequest request, String assignedHost, Container container) {
-    requestsQueue.remove(request);
-    allocatedContainers.get(assignedHost).remove(container);
-    if (hostAffinityEnabled) {
-      // assignedHost may not always be the preferred host.
-      // Hence, we should safely decrement the counter for the preferredHost
-      requestsToCountMap.get(request.getPreferredHost()).decrementAndGet();
-    }
-    // To avoid getting back excess containers
-    amClient.removeContainerRequest(request.getIssuedRequest());
-  }
-
-  /**
-   * If requestQueue is empty, all extra containers in the buffer should be released and update the entire system's state
-   * Needs to be synchronized because it is modifying shared state buffers
-   * @return the number of containers released.
-   */
-  public synchronized int releaseExtraContainers() {
-    int numReleasedContainers = 0;
-
-    if (requestsQueue.isEmpty()) {
-      log.debug("Container Requests Queue is empty.");
-
-      if (hostAffinityEnabled) {
-        List<String> allocatedHosts = getAllocatedHosts();
-        for (String host : allocatedHosts) {
-          numReleasedContainers += releaseContainersForHost(host);
-        }
-      } else {
-        numReleasedContainers += releaseContainersForHost(ANY_HOST);
-      }
-      clearState();
-    }
-    return numReleasedContainers;
-  }
-
-  /**
-   * Releases all allocated containers for the specified host.
-   * @param host  the host for which the containers should be released.
-   * @return      the number of containers released.
-   */
-  private int releaseContainersForHost(String host) {
-    int numReleasedContainers = 0;
-    List<Container> containers = getContainersOnAHost(host);
-    if (containers != null) {
-      for (Container c : containers) {
-        log.info("Releasing extra container {} allocated on {}", c.getId(), host);
-        amClient.releaseAssignedContainer(c.getId());
-        numReleasedContainers++;
-      }
-    }
-    return numReleasedContainers;
-  }
-
-  /**
-   * Releases a container that was allocated and assigned but could not be started.
-   * e.g. because of a ConnectException while trying to communicate with the NM.
-   * This method assumes the specified container and associated request have already
-   * been removed from their respective queues.
-   *
-   * @param container the {@link Container} to release.
-   */
-  public void releaseUnstartableContainer(Container container) {
-    log.info("Releasing unstartable container {}", container.getId());
-    amClient.releaseAssignedContainer(container.getId());
-  }
-
-  /**
-   * Clears all the state variables
-   * Performed when there are no more unfulfilled requests
-   * This is not synchronized because it is private.
-   */
-  private void clearState() {
-    allocatedContainers.clear();
-    requestsToCountMap.clear();
-    requestsQueue.clear();
-  }
-
-  /**
-   * Returns the list of hosts which has at least 1 allocatedContainer in the buffer
-   * @return list of host names
-   */
-  private List<String> getAllocatedHosts() {
-    List<String> hostKeys = new ArrayList<String>();
-    for(Map.Entry<String, List<Container>> entry: allocatedContainers.entrySet()) {
-      if(entry.getValue().size() > 0) {
-        hostKeys.add(entry.getKey());
-      }
-    }
-    return hostKeys;
-  }
-
-  /**
-   * Returns the list of containers allocated on a given host
-   * If no containers were ever allocated on the given host, it returns null.
-   * @param host hostname
-   * @return list of containers allocated on the given host, or null
-   */
-  public List<Container> getContainersOnAHost(String host) {
-    return allocatedContainers.get(host);
-  }
-
-  public PriorityBlockingQueue<SamzaContainerRequest> getRequestsQueue() {
-    return requestsQueue;
-  }
-
-  public ConcurrentHashMap<String, AtomicInteger> getRequestsToCountMap() {
-    return requestsToCountMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
deleted file mode 100644
index e8976bc..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.job.CommandBuilder;
-import org.apache.samza.job.ShellCommandBuilder;
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ContainerUtil {
-  private static final Logger log = LoggerFactory.getLogger(ContainerUtil.class);
-
-  private final Config config;
-  private final SamzaAppState state;
-  private final YarnConfiguration yarnConfiguration;
-
-  private NMClient nmClient;
-  private final YarnConfig yarnConfig;
-  private final TaskConfig taskConfig;
-
-  public ContainerUtil(Config config,
-                       SamzaAppState state,
-                       YarnConfiguration yarnConfiguration) {
-    this.config = config;
-    this.state = state;
-    this.yarnConfiguration = yarnConfiguration;
-
-    this.nmClient = NMClient.createNMClient();
-    nmClient.init(this.yarnConfiguration);
-
-    this.yarnConfig = new YarnConfig(config);
-    this.taskConfig = new TaskConfig(config);
-  }
-
-  protected void setNmClient(NMClient nmClient){
-    this.nmClient = nmClient;
-  }
-
-  public void incrementContainerRequests() {
-    state.containerRequests.incrementAndGet();
-  }
-
-  public void runMatchedContainer(int samzaContainerId, Container container) throws SamzaContainerLaunchException {
-    state.matchedContainerRequests.incrementAndGet();
-    runContainer(samzaContainerId, container);
-  }
-
-  public void runContainer(int samzaContainerId, Container container) throws SamzaContainerLaunchException {
-    String containerIdStr = ConverterUtils.toString(container.getId());
-    log.info("Got available container ID ({}) for container: {}", samzaContainerId, container);
-
-
-    // check if we have framework path specified. If yes - use it, if not use default ./__package/
-    String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries
-    String cmdPath = "./__package/";
-
-    String fwkPath = JobConfig.getFwkPath(config);
-    if(fwkPath != null && (! fwkPath.isEmpty())) {
-      cmdPath = fwkPath;
-      jobLib = "export JOB_LIB_DIR=./__package/lib";
-    }
-    log.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + cmdPath + ";jobLib=" + jobLib);
-
-
-    CommandBuilder cmdBuilder = getCommandBuilder(samzaContainerId, cmdPath);
-    String command = cmdBuilder.buildCommand();
-    log.info("Container ID {} using command {}", samzaContainerId, command);
-
-    Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
-    printContainerEnvironmentVariables(samzaContainerId, env);
-
-    log.info("Samza FWK path: " + command + "; env=" + env);
-
-    Path path = new Path(yarnConfig.getPackagePath());
-    log.info("Starting container ID {} using package path {}", samzaContainerId, path);
-
-    startContainer(path, container, env,
-        getFormattedCommand(ApplicationConstants.LOG_DIR_EXPANSION_VAR, jobLib, command, ApplicationConstants.STDOUT,
-            ApplicationConstants.STDERR));
-
-    if (state.neededContainers.decrementAndGet() == 0) {
-      state.jobHealthy.set(true);
-    }
-    state.runningContainers.put(samzaContainerId, new YarnContainer(container));
-
-    log.info("Claimed container ID {} for container {} on node {} (http://{}/node/containerlogs/{}).",
-        new Object[]{samzaContainerId, containerIdStr, container
-            .getNodeId().getHost(), container.getNodeHttpAddress(), containerIdStr});
-
-    log.info("Started container ID {}", samzaContainerId);
-  }
-
-  protected void startContainer(Path packagePath,
-                                Container container,
-                                Map<String, String> env,
-                                final String cmd)
-      throws SamzaContainerLaunchException {
-    log.info("Starting container {} {} {} {}",
-        new Object[]{packagePath, container, env, cmd});
-
-    // set the local package so that the containers and app master are provisioned with it
-    LocalResource packageResource = Records.newRecord(LocalResource.class);
-    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
-    FileStatus fileStatus;
-    try {
-      fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
-    } catch (IOException ioe) {
-      log.error("IO Exception when accessing the package status from the filesystem", ioe);
-      throw new SamzaException("IO Exception when accessing the package status from the filesystem");
-    }
-
-    packageResource.setResource(packageUrl);
-    packageResource.setSize(fileStatus.getLen());
-    packageResource.setTimestamp(fileStatus.getModificationTime());
-    packageResource.setType(LocalResourceType.ARCHIVE);
-    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
-
-    ByteBuffer allTokens;
-    // copy tokens (copied from dist shell example)
-    try {
-      Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-      DataOutputBuffer dob = new DataOutputBuffer();
-      credentials.writeTokenStorageToStream(dob);
-
-      // now remove the AM->RM token so that containers cannot access it
-      Iterator iter = credentials.getAllTokens().iterator();
-      while (iter.hasNext()) {
-        TokenIdentifier token = ((Token) iter.next()).decodeIdentifier();
-        if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
-          iter.remove();
-        }
-      }
-      allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-
-    } catch (IOException ioe) {
-      ioe.printStackTrace();
-      throw new SamzaException("IO Exception when writing credentials to output buffer");
-    }
-
-    ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
-    context.setEnvironment(env);
-    context.setTokens(allTokens.duplicate());
-    context.setCommands(new ArrayList<String>() {{add(cmd);}});
-    context.setLocalResources(Collections.singletonMap("__package", packageResource));
-
-    log.debug("setting package to {}", packageResource);
-    log.debug("setting context to {}", context);
-
-    StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
-    startContainerRequest.setContainerLaunchContext(context);
-    try {
-      nmClient.startContainer(container, context);
-    } catch (YarnException ye) {
-      log.error("Received YarnException when starting container: " + container.getId(), ye);
-      throw new SamzaContainerLaunchException("Received YarnException when starting container: " + container.getId(), ye);
-    } catch (IOException ioe) {
-      log.error("Received IOException when starting container: " + container.getId(), ioe);
-      throw new SamzaContainerLaunchException("Received IOException when starting container: " + container.getId(), ioe);
-    }
-  }
-
-  private String getFormattedCommand(String logDirExpansionVar,
-                                     String jobLib,
-                                     String command,
-                                     String stdOut,
-                                     String stdErr) {
-    if (!jobLib.isEmpty()) {
-      jobLib = "&& " + jobLib; // add job's libraries exported to an env variable
-    }
-
-    return String
-        .format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar,
-            jobLib, logDirExpansionVar, command, stdOut, stdErr);
-  }
-
-  /**
-   * Instantiates and initializes the configured {@link CommandBuilder} class.
-   *
-   * @param samzaContainerId  the Samza container Id for which the container start command will be built.
-   * @return                  the command builder, which is initialized and ready to build the command.
-   */
-  private CommandBuilder getCommandBuilder(int samzaContainerId, String cmdPath) {
-    String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName());
-    CommandBuilder cmdBuilder = (CommandBuilder) Util.getObj(cmdBuilderClassName);
-    cmdBuilder.setConfig(config).setId(samzaContainerId).setUrl(state.coordinatorUrl).setCommandPath(cmdPath);
-    return cmdBuilder;
-  }
-
-  /**
-   * Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters.
-   *
-   * @param cmdBuilder        the command builder containing the environment variables.
-   * @return                  the map containing the escaped environment variables.
-   */
-  private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmdBuilder) {
-    Map<String, String> env = new HashMap<String, String>();
-    for (Map.Entry<String, String> entry : cmdBuilder.buildEnvironment().entrySet()) {
-      String escapedValue = Util.envVarEscape(entry.getValue());
-      env.put(entry.getKey(), escapedValue);
-    }
-
-    return env;
-  }
-
-  /**
-   * @param samzaContainerId  the Samza container Id for logging purposes.
-   * @param env               the Map of environment variables to their respective values.
-   */
-  private void printContainerEnvironmentVariables(int samzaContainerId, Map<String, String> env) {
-    StringBuilder sb = new StringBuilder();
-    for (Map.Entry<String, String> entry : env.entrySet()) {
-      sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));
-    }
-    log.info("Container ID {} using environment variables: {}", samzaContainerId, sb.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
deleted file mode 100644
index 1d101fa..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.config.YarnConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is the allocator thread that will be used by SamzaTaskManager when host-affinity is enabled for a job. It is similar to {@link org.apache.samza.job.yarn.ContainerAllocator}, except that it considers container locality for allocation.
- *
- * In case of host-affinity, each container request ({@link org.apache.samza.job.yarn.SamzaContainerRequest} encapsulates the identifier of the container to be run and a "preferredHost". preferredHost is determined by the locality mappings in the coordinator stream.
- * This thread periodically wakes up and makes the best-effort to assign a container to the preferredHost. If the preferredHost is not returned by the RM before the corresponding container expires, the thread assigns the container to any other host that is allocated next.
- * The container expiry is determined by CONTAINER_REQUEST_TIMEOUT and is configurable on a per-job basis.
- *
- * If there aren't enough containers, it waits by sleeping for ALLOCATOR_SLEEP_TIME milliseconds.
- */
-public class HostAwareContainerAllocator extends AbstractContainerAllocator {
-  private static final Logger log = LoggerFactory.getLogger(HostAwareContainerAllocator.class);
-
-  private final int CONTAINER_REQUEST_TIMEOUT;
-
-  public HostAwareContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
-                                     ContainerUtil containerUtil,
-                                     YarnConfig yarnConfig) {
-    super(amClient, containerUtil, new ContainerRequestState(amClient, true), yarnConfig);
-    this.CONTAINER_REQUEST_TIMEOUT = yarnConfig.getContainerRequestTimeout();
-  }
-
-  /**
-   * Since host-affinity is enabled, all allocated container resources are buffered in the list keyed by "preferredHost".
-   *
-   * If the requested host is not available, the thread checks to see if the request has expired.
-   * If it has expired, it runs the container with expectedContainerID on one of the available hosts from the
-   * allocatedContainers buffer keyed by "ANY_HOST".
-   */
-  @Override
-  public void assignContainerRequests() {
-    while (hasPendingRequest()) {
-      SamzaContainerRequest request = peekPendingRequest();
-      String preferredHost = request.getPreferredHost();
-      int expectedContainerId = request.getExpectedContainerId();
-
-      log.info("Handling request for container id {} on preferred host {}", expectedContainerId, preferredHost);
-
-      if (hasAllocatedContainer(preferredHost)) {
-        // Found allocated container at preferredHost
-        runContainer(request, preferredHost);
-      } else {
-        // No allocated container on preferredHost
-        log.info("Did not find any allocated containers on preferred host {} for running container id {}",
-            preferredHost, expectedContainerId);
-
-        boolean expired = requestExpired(request);
-        if (expired && hasAllocatedContainer(ANY_HOST)) {
-          runContainer(request, ANY_HOST);
-        } else {
-          log.info("Either the request timestamp {} is greater than container request timeout {}ms or we couldn't "
-                  + "find any free allocated containers in the buffer. Breaking out of loop.",
-              request.getRequestTimestamp(), CONTAINER_REQUEST_TIMEOUT);
-          break;
-        }
-      }
-    }
-
-  }
-
-  private boolean requestExpired(SamzaContainerRequest request) {
-    return System.currentTimeMillis() - request.getRequestTimestamp() > CONTAINER_REQUEST_TIMEOUT;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
deleted file mode 100644
index c116ed8..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn;
-
-import java.util.Map;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.samza.coordinator.JobModelManager;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-
-import java.net.URL;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class SamzaAppState {
-  /**
-   * Represents an invalid or unknown Samza container ID.
-   */
-  private static final int UNUSED_CONTAINER_ID = -1;
-
-  /**
-   * Job Coordinator is started in the AM and follows the {@link org.apache.samza.job.yarn.SamzaAppMasterService}
-   * lifecycle. It helps querying JobModel related info in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
-   * and locality information when host-affinity is enabled in {@link org.apache.samza.job.yarn.SamzaTaskManager}
-   */
-  public final JobModelManager jobCoordinator;
-
-  /*  The following state variables are primarily used for reference in the AM web services   */
-  /**
-   * Task Id of the AM
-   * Used for displaying in the AM UI. Usage found in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
-   * and scalate/WEB-INF/views/index.scaml
-   */
-  public final int taskId;
-  /**
-   * Id of the AM container (as allocated by the RM)
-   * Used for displaying in the AM UI. Usage in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
-   * and scalate/WEB-INF/views/index.scaml
-   */
-  public final ContainerId amContainerId;
-  /**
-   * Host name of the NM on which the AM is running
-   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
-   */
-  public final String nodeHost;
-  /**
-   * NM port on which the AM is running
-   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
-   */
-  public final int nodePort;
-  /**
-   * Http port of the NM on which the AM is running
-   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
-   */
-  public final int nodeHttpPort;
-  /**
-   * Application Attempt Id as provided by Yarn
-   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
-   * and {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
-   */
-  public final ApplicationAttemptId appAttemptId;
-  /**
-   * JMX Server URL, if enabled
-   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
-   */
-  public String jmxUrl = "";
-  /**
-   * JMX Server Tunneling URL, if enabled
-   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
-   */
-  public String jmxTunnelingUrl = "";
-  /**
-   * Job Coordinator URL
-   * Usage in {@link org.apache.samza.job.yarn.SamzaAppMasterService} &amp; ContainerUtil
-   */
-  public URL coordinatorUrl = null;
-  /**
-   * URL of the {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
-   */
-  public URL rpcUrl = null;
-  /**
-   * URL of the {@link org.apache.samza.webapp.ApplicationMasterWebServlet}
-   */
-  public URL trackingUrl = null;
-
-  /**
-   * The following state variables are required for the correct functioning of the TaskManager
-   * Some of them are shared between the AMRMCallbackThread and the ContainerAllocator thread, as mentioned below.
-   */
-
-  /**
-   * Number of containers that have completed their execution and exited successfully
-   */
-  public AtomicInteger completedContainers = new AtomicInteger(0);
-
-  /**
-   * Number of failed containers
-   * */
-  public AtomicInteger failedContainers = new AtomicInteger(0);
-
-  /**
-   * Number of containers released due to extra allocation returned by the RM
-   */
-  public AtomicInteger releasedContainers = new AtomicInteger(0);
-
-  /**
-   * ContainerStatus of failed containers.
-   */
-  public ConcurrentMap<String, ContainerStatus> failedContainersStatus = new ConcurrentHashMap<String, ContainerStatus>();
-
-  /**
-   * Number of containers configured for the job
-   */
-  public int containerCount = 0;
-
-  /**
-   * Set of finished containers - TODO: Can be changed to a counter
-   */
-  public Set<Integer> finishedContainers = new HashSet<Integer>();
-
-  /**
-   *  Number of containers needed for the job to be declared healthy
-   *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
-   */
-  public AtomicInteger neededContainers = new AtomicInteger(0);
-
-  /**
-   *  Map of the samzaContainerId to the {@link org.apache.samza.job.yarn.YarnContainer} on which it is running
-   *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
-   */
-  public ConcurrentMap<Integer, YarnContainer> runningContainers = new ConcurrentHashMap<Integer, YarnContainer>(0);
-
-  /**
-   * Final status of the application
-   * Modified by both the AMRMCallbackThread and the ContainerAllocator thread
-   */
-  public FinalApplicationStatus status = FinalApplicationStatus.UNDEFINED;
-
-  /**
-   * State indicating whether the job is healthy or not
-   * Modified by both the AMRMCallbackThread and the ContainerAllocator thread
-   */
-  public AtomicBoolean jobHealthy = new AtomicBoolean(true);
-
-  public AtomicInteger containerRequests = new AtomicInteger(0);
-
-  public AtomicInteger matchedContainerRequests = new AtomicInteger(0);
-
-  public SamzaAppState(JobModelManager jobModelManager,
-                       int taskId,
-                       ContainerId amContainerId,
-                       String nodeHost,
-                       int nodePort,
-                       int nodeHttpPort) {
-    this.jobCoordinator = jobModelManager;
-    this.taskId = taskId;
-    this.amContainerId = amContainerId;
-    this.nodeHost = nodeHost;
-    this.nodePort = nodePort;
-    this.nodeHttpPort = nodeHttpPort;
-    this.appAttemptId = amContainerId.getApplicationAttemptId();
-
-  }
-
-  /**
-   * Returns the Samza container ID if the specified YARN container ID corresponds to a running container.
-   *
-   * @param yarnContainerId the YARN container ID.
-   * @return                the Samza container ID if it is running,
-   *                        otherwise {@link SamzaAppState#UNUSED_CONTAINER_ID}.
-   */
-  public int getRunningSamzaContainerId(ContainerId yarnContainerId) {
-    int containerId = UNUSED_CONTAINER_ID;
-    for(Map.Entry<Integer, YarnContainer> entry: runningContainers.entrySet()) {
-      if(entry.getValue().id().equals(yarnContainerId)) {
-        containerId = entry.getKey();
-        break;
-      }
-    }
-    return containerId;
-  }
-
-  /**
-   * @param samzaContainerId  the Samza container ID to validate.
-   * @return                  {@code true} if the ID is valid, {@code false} otherwise
-   */
-  public static boolean isValidContainerId(int samzaContainerId) {
-    return samzaContainerId != UNUSED_CONTAINER_ID;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
deleted file mode 100644
index 4a04eb6..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn;
-
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-
-/**
- * SamzaContainerRequest encapsulate the ContainerRequest and the meta-information of the request.
- */
-public class SamzaContainerRequest implements Comparable<SamzaContainerRequest> {
-  private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
-
-  private Priority priority;
-  private Resource capability;
-
-  /**
-   *  If host-affinity is enabled, the request specifies a preferred host for the container
-   *  If not, preferredHost defaults to ANY_HOST
-   */
-  private String preferredHost;
-  // Timestamp at which the request is issued. Used to check request expiry
-  private Long requestTimestamp;
-  // Actual Container Request that is issued to the RM
-  public AMRMClient.ContainerRequest issuedRequest;
-  // Container Id that is expected to run on the container returned for this request
-  public int expectedContainerId;
-
-  public SamzaContainerRequest(int memoryMb, int cpuCores, int priority, int expectedContainerId, String preferredHost) {
-    this.capability = Resource.newInstance(memoryMb, cpuCores);
-    this.priority = Priority.newInstance(priority);
-    this.expectedContainerId = expectedContainerId;
-    if (preferredHost == null || preferredHost.equals(ANY_HOST)) {
-      this.preferredHost = ANY_HOST;
-      this.issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, this.priority);
-    } else {
-      this.preferredHost = preferredHost;
-      this.issuedRequest = new AMRMClient.ContainerRequest(
-          capability,
-          new String[]{this.preferredHost},
-          null,
-          this.priority);
-    }
-
-    this.requestTimestamp = System.currentTimeMillis();
-  }
-
-  // Convenience constructor for unit testing
-  protected SamzaContainerRequest(int expectedContainerId, String preferredHost) {
-    this(
-        AbstractContainerAllocator.DEFAULT_CONTAINER_MEM,
-        AbstractContainerAllocator.DEFAULT_CPU_CORES,
-        AbstractContainerAllocator.DEFAULT_PRIORITY,
-        expectedContainerId,
-        preferredHost);
-  }
-
-  public Resource getCapability() {
-    return capability;
-  }
-
-  public Priority getPriority() {
-    return priority;
-  }
-
-  public AMRMClient.ContainerRequest getIssuedRequest() {
-    return issuedRequest;
-  }
-
-  public int getExpectedContainerId() {
-    return expectedContainerId;
-  }
-
-  public String getPreferredHost() {
-    return preferredHost;
-  }
-
-  public Long getRequestTimestamp() {
-    return requestTimestamp;
-  }
-
-  @Override
-  public String toString() {
-    return "[requestIssueTime=" + issuedRequest.toString() + "]";
-  }
-
-  @Override
-  public int compareTo(SamzaContainerRequest o) {
-    if(requestTimestamp < o.requestTimestamp)
-      return -1;
-    if(requestTimestamp > o.requestTimestamp)
-      return 1;
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
deleted file mode 100644
index bc95f31..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Samza's application master is mostly interested in requesting containers to
- * run Samza jobs. SamzaTaskManager is responsible for requesting new
- * containers, handling failures, and notifying the application master that the
- * job is done.
- *
- * The following main threads are involved in the execution of the Samza AM:
- *  - The main thread (defined in SamzaAppMaster) that drive the AM to send out container requests to RM
- *  - The callback handler thread that receives the responses from RM and handles:
- *      - Populating a buffer when a container is allocated by the RM
- *        (allocatedContainers in {@link org.apache.samza.job.yarn.ContainerRequestState}
- *      - Identifying the cause of container failure & re-request containers from RM by adding request to the
- *        internal requestQueue in {@link org.apache.samza.job.yarn.ContainerRequestState}
- *  - The allocator thread defined here assigns the allocated containers to pending requests
- *    (See {@link org.apache.samza.job.yarn.ContainerAllocator} or {@link org.apache.samza.job.yarn.HostAwareContainerAllocator})
- */
-
-class SamzaTaskManager implements YarnAppMasterListener {
-  private static final Logger log = LoggerFactory.getLogger(SamzaTaskManager.class);
-
-  private final boolean hostAffinityEnabled;
-  private final SamzaAppState state;
-
-  // Derived configs
-  private final JobConfig jobConfig;
-  private final YarnConfig yarnConfig;
-
-  private final AbstractContainerAllocator containerAllocator;
-  private final Thread allocatorThread;
-
-  // State
-  private boolean tooManyFailedContainers = false;
-  private Map<Integer, ContainerFailure> containerFailures = new HashMap<Integer, ContainerFailure>();
-
-  public SamzaTaskManager(Config config,
-                          SamzaAppState state,
-                          AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
-                          YarnConfiguration conf) {
-    this.state = state;
-    this.jobConfig = new JobConfig(config);
-    this.yarnConfig = new YarnConfig(config);
-
-    this.hostAffinityEnabled = yarnConfig.getHostAffinityEnabled();
-
-    if (this.hostAffinityEnabled) {
-      this.containerAllocator = new HostAwareContainerAllocator(
-          amClient,
-          new ContainerUtil(config, state, conf),
-          yarnConfig
-      );
-    } else {
-      this.containerAllocator = new ContainerAllocator(
-          amClient,
-          new ContainerUtil(config, state, conf),
-          yarnConfig);
-    }
-
-    this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
-  }
-
-  @Override
-  public boolean shouldShutdown() {
-    return tooManyFailedContainers || state.completedContainers.get() == state.containerCount || !allocatorThread.isAlive();
-  }
-
-  @Override
-  public void onInit() {
-    state.containerCount = jobConfig.getContainerCount();
-
-    state.neededContainers.set(state.containerCount);
-
-    // Request initial set of containers
-    Map<Integer, String> containerToHostMapping = state.jobCoordinator.jobModel().getAllContainerLocality();
-
-    containerAllocator.requestContainers(containerToHostMapping);
-
-    // Start container allocator thread
-    log.info("Starting the container allocator thread");
-    allocatorThread.start();
-  }
-
-  @Override
-  public void onReboot() {
-
-  }
-
-  @Override
-  public void onShutdown() {
-    // Shutdown allocator thread
-    containerAllocator.setIsRunning(false);
-    try {
-      allocatorThread.join();
-    } catch (InterruptedException ie) {
-      log.info("Allocator Thread join() threw an interrupted exception", ie);
-      // Should we throw exception here??
-    }
-  }
-
-  @Override
-  public void onContainerAllocated(Container container) {
-    containerAllocator.addContainer(container);
-  }
-
-  /**
-   * This methods handles the onResourceCompleted callback from the RM. Based on the ContainerExitStatus, it decides
-   * whether a container that exited is marked as complete or failure.
-   */
-  @Override
-  public void onContainerCompleted(ContainerStatus containerStatus) {
-    String containerIdStr = ConverterUtils.toString(containerStatus.getContainerId());
-    int containerId = state.getRunningSamzaContainerId(containerStatus.getContainerId());
-    state.runningContainers.remove(containerId);
-
-    int exitStatus = containerStatus.getExitStatus();
-    switch(exitStatus) {
-      case ContainerExitStatus.SUCCESS:
-        log.info("Container {} completed successfully.", containerIdStr);
-
-        state.completedContainers.incrementAndGet();
-
-        if (SamzaAppState.isValidContainerId(containerId)) {
-          state.finishedContainers.add(containerId);
-          containerFailures.remove(containerId);
-        }
-
-        if (state.completedContainers.get() == state.containerCount) {
-          log.info("Setting job status to SUCCEEDED, since all containers have been marked as completed.");
-          state.status = FinalApplicationStatus.SUCCEEDED;
-        }
-        break;
-
-      case ContainerExitStatus.DISKS_FAILED:
-      case ContainerExitStatus.ABORTED:
-      case ContainerExitStatus.PREEMPTED:
-        log.info("Got an exit code of {}. This means that container {} was "
-            + "killed by YARN, either due to being released by the application "
-            + "master or being 'lost' due to node failures etc. or due to preemption by the RM",
-            exitStatus,
-            containerIdStr);
-
-        state.releasedContainers.incrementAndGet();
-        // If this container was assigned some partitions (a containerId), then
-        // clean up, and request a new container for the tasks. This only
-        // should happen if the container was 'lost' due to node failure, not
-        // if the AM released the container.
-        if (SamzaAppState.isValidContainerId(containerId)) {
-          log.info("Released container {} was assigned task group ID {}. Requesting a new container for the task group.", containerIdStr, containerId);
-
-          state.neededContainers.incrementAndGet();
-          state.jobHealthy.set(false);
-
-          // request a container on new host
-          containerAllocator.requestContainer(containerId, ContainerAllocator.ANY_HOST);
-        }
-
-        break;
-
-      default:
-        // TODO: Handle failure more intelligently. Should track NodeFailures!
-        log.info("Container failed for some reason. Let's start it again");
-        log.info("Container " + containerIdStr + " failed with exit code " + exitStatus + " - " + containerStatus.getDiagnostics());
-
-        state.failedContainers.incrementAndGet();
-        state.failedContainersStatus.put(containerIdStr, containerStatus);
-        state.jobHealthy.set(false);
-
-        if(SamzaAppState.isValidContainerId(containerId)) {
-          state.neededContainers.incrementAndGet();
-          recordContainerFailCount(containerIdStr, containerId);
-
-          if (!tooManyFailedContainers) {
-            // Find out previously running container location
-            String lastSeenOn = state.jobCoordinator.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);
-            if (!hostAffinityEnabled || lastSeenOn == null) {
-              lastSeenOn = ContainerAllocator.ANY_HOST;
-            }
-            // Request a new container
-            containerAllocator.requestContainer(containerId, lastSeenOn);
-          }
-        }
-
-    }
-  }
-
-  /**
-   * Increments the failure count, logs the failure, and records the  last failure time for the specified container.
-   * Also, updates the global flag indicating whether too many failures have occurred and returns that flag.
-   *
-   * @param containerIdStr  the YARN container Id for logging purposes.
-   * @param containerId     the Samza container/group Id that failed.
-   * @return                true if any container has failed more than the max number of times.
-   */
-  private boolean recordContainerFailCount(String containerIdStr, int containerId) {
-    // A container failed for an unknown reason. Let's check to see if
-    // we need to shutdown the whole app master if too many container
-    // failures have happened. The rules for failing are that the
-    // failure count for a task group id must be > the configured retry
-    // count, and the last failure (the one prior to this one) must have
-    // happened less than retry window ms ago. If retry count is set to
-    // 0, the app master will fail on any container failure. If the
-    // retry count is set to a number < 0, a container failure will
-    // never trigger an app master failure.
-    int retryCount = yarnConfig.getContainerRetryCount();
-    int retryWindowMs = yarnConfig.getContainerRetryWindowMs();
-
-    if (retryCount == 0) {
-      log.error("Container ID {} ({}) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed.", containerId, containerIdStr);
-
-      tooManyFailedContainers = true;
-    } else if (retryCount > 0) {
-      int currentFailCount;
-      long lastFailureTime;
-      if(containerFailures.containsKey(containerId)) {
-        ContainerFailure failure = containerFailures.get(containerId);
-        currentFailCount = failure.getCount() + 1;
-        lastFailureTime = failure.getLastFailure();
-        } else {
-        currentFailCount = 1;
-        lastFailureTime = 0L;
-      }
-      if (currentFailCount >= retryCount) {
-        long lastFailureMsDiff = System.currentTimeMillis() - lastFailureTime;
-
-        if (lastFailureMsDiff < retryWindowMs) {
-          log.error("Container ID " + containerId + "(" + containerIdStr + ") has failed " + currentFailCount +
-              " times, with last failure " + lastFailureMsDiff + "ms ago. This is greater than retry count of " +
-              retryCount + " and window of " + retryWindowMs + "ms , so shutting down the application master, and marking the job as failed.");
-
-          // We have too many failures, and we're within the window
-          // boundary, so reset shut down the app master.
-          tooManyFailedContainers = true;
-          state.status = FinalApplicationStatus.FAILED;
-        } else {
-          log.info("Resetting fail count for container ID {} back to 1, since last container failure ({}) for " +
-              "this container ID was outside the bounds of the retry window.", containerId, containerIdStr);
-
-          // Reset counter back to 1, since the last failure for this
-          // container happened outside the window boundary.
-          containerFailures.put(containerId, new ContainerFailure(1, System.currentTimeMillis()));
-        }
-      } else {
-        log.info("Current fail count for container ID {} is {}.", containerId, currentFailCount);
-        containerFailures.put(containerId, new ContainerFailure(currentFailCount, System.currentTimeMillis()));
-      }
-    }
-    return tooManyFailedContainers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
index 57092e1..7e563f1 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
@@ -53,29 +53,24 @@ public class YarnAppState {
 
   public ConcurrentMap<String, ContainerStatus> failedContainersStatus = new ConcurrentHashMap<String, ContainerStatus>();
 
-  public YarnAppState(JobModelManager jobModelManager,
-                      int taskId,
+  public YarnAppState(int taskId,
                       ContainerId amContainerId,
                       String nodeHost,
                       int nodePort,
-                      int nodeHttpPort,
-                      SamzaApplicationState state) {
-    this.jobModelManager = jobModelManager;
+                      int nodeHttpPort
+                      ) {
     this.taskId = taskId;
     this.amContainerId = amContainerId;
     this.nodeHost = nodeHost;
     this.nodePort = nodePort;
     this.nodeHttpPort = nodeHttpPort;
     this.appAttemptId = amContainerId.getApplicationAttemptId();
-    this.samzaAppState = state;
   }
 
 
   @Override
   public String toString() {
     return "YarnAppState{" +
-        "samzaAppState=" + samzaAppState +
-        ", jobModelReader=" + jobModelManager +
         ", taskId=" + taskId +
         ", amContainerId=" + amContainerId +
         ", nodeHost='" + nodeHost + '\'' +
@@ -90,7 +85,6 @@ public class YarnAppState {
         '}';
   }
 
-  public final SamzaApplicationState samzaAppState;
    /* The following state variables are primarily used for reference in the AM web services   */
 
   /**
@@ -98,7 +92,6 @@ public class YarnAppState {
    * Used for displaying in the AM UI. Usage found in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
    * and scalate/WEB-INF/views/index.scaml
    */
-  public final JobModelManager jobModelManager;
 
   public final int taskId;
   /**
@@ -132,7 +125,7 @@ public class YarnAppState {
   //TODO: Make the below 3 variables immutable. Tracked as a part of SAMZA-902. Save for later.
   /**
    * Job Coordinator URL
-   * Usage in {@link org.apache.samza.job.yarn.SamzaAppMasterService} &amp; YarnContainerRunner
+   * Usage in {@link org.apache.samza.job.yarn.SamzaYarnAppMasterService} &amp; YarnContainerRunner
    */
   public URL coordinatorUrl = null;
   /**