You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/01/02 20:17:23 UTC

samza git commit: SAMZA-1528: Change ClusterResourceManager to use the async NMClient

Repository: samza
Updated Branches:
  refs/heads/master 93219c78d -> 882f61a69


SAMZA-1528: Change ClusterResourceManager to use the async NMClient

- Rewrite container handling to be asynchronous
- Verified various failure scenarios using Unit tests, and deployments of a local Samza job.

Author: Jagadish <jv...@linkedin.com>
Author: Fred Ji <ha...@gmail.com>
Author: Srinivasulu Punuru <sp...@linkedin.com>

Reviewers: Jacob Maes<jm...@linkedin.com>, Xinyu Liu<xi...@gmail.com>

Closes #380 from vjagadish1989/cluster-mgr-refactor1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/882f61a6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/882f61a6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/882f61a6

Branch: refs/heads/master
Commit: 882f61a6925d3950e408ec8e202c0e8b6cdff450
Parents: 93219c7
Author: Jagadish <jv...@linkedin.com>
Authored: Tue Jan 2 12:16:51 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Jan 2 12:16:51 2018 -0800

----------------------------------------------------------------------
 .../AbstractContainerAllocator.java             |  17 +-
 .../clustermanager/ClusterResourceManager.java  |  26 +-
 .../clustermanager/ContainerProcessManager.java |  74 ++++-
 .../clustermanager/SamzaApplicationState.java   |   8 +-
 .../samza/clustermanager/SamzaResource.java     |   8 +
 .../samza/operators/impl/OperatorImpl.java      |  16 +-
 .../MockClusterResourceManager.java             |  19 +-
 .../MockClusterResourceManagerCallback.java     |  10 +
 .../clustermanager/TestContainerAllocator.java  |  55 ----
 .../TestContainerProcessManager.java            | 101 +++---
 .../TestHostAwareContainerAllocator.java        |  56 ----
 .../samza/operators/impl/TestOperatorImpl.java  |   1 +
 .../org/apache/samza/job/yarn/YarnAppState.java |   4 +-
 .../job/yarn/YarnClusterResourceManager.java    | 310 +++++++++++++++++--
 .../samza/job/yarn/YarnContainerRunner.java     | 272 ----------------
 15 files changed, 498 insertions(+), 479 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index b83d83c..1b70857 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -151,20 +151,11 @@ public abstract class AbstractContainerAllocator implements Runnable {
     log.info("Found available resources on {}. Assigning request for container_id {} with "
             + "timestamp {} to resource {}",
         new Object[]{preferredHost, String.valueOf(containerID), request.getRequestTimestampMs(), resource.getResourceID()});
-    try {
-      //launches a StreamProcessor on the resource
-      clusterResourceManager.launchStreamProcessor(resource, builder);
 
-      if (state.neededContainers.decrementAndGet() == 0) {
-        state.jobHealthy.set(true);
-      }
-      state.runningContainers.put(request.getContainerID(), resource);
-
-    } catch (SamzaContainerLaunchException e) {
-      log.warn(String.format("Got exception while starting resource %s. Requesting a new resource on any host", resource), e);
-      resourceRequestState.releaseUnstartableContainer(resource);
-      requestResource(containerID, ResourceRequestState.ANY_HOST);
-    }
+    //Submit a request to launch a StreamProcessor on the provided resource. To match with the response returned later
+    //in the callback, we should also store state about the container whose launch is pending.
+    clusterResourceManager.launchStreamProcessor(resource, builder);
+    state.pendingContainers.put(containerID, resource);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
index 715cf66..f8a8c8b 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -105,15 +105,21 @@ public abstract class ClusterResourceManager {
   public abstract void releaseResources(SamzaResource resource);
 
   /***
-   * Requests the launch of a StreamProcessor with the specified context on the resource.
+   * Requests the launch of a StreamProcessor with the specified context on the resource asynchronously.
+   *
+   * <p>
+   *   Either {@link Callback#onStreamProcessorLaunchSuccess(SamzaResource)} or
+   *   {@link Callback#onStreamProcessorLaunchFailure(SamzaResource, Throwable)} will be invoked
+   *   to indicate the result of this operation.
+   * </p>
+   *
    * @param resource the specified resource
    * @param builder A builder implementation that encapsulates the context for the
    *                StreamProcessor. A builder encapsulates the ID for the processor, the
    *                build environment, the command to execute etc.
-   * @throws SamzaContainerLaunchException  when there's an error during the requesting launch.
    *
    */
-  public abstract void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException;
+  public abstract void launchStreamProcessor(SamzaResource resource, CommandBuilder builder);
 
 
   public abstract void stop(SamzaApplicationState.SamzaAppStatus status);
@@ -143,6 +149,20 @@ public abstract class ClusterResourceManager {
      */
     void onResourcesCompleted(List<SamzaResourceStatus> resources);
 
+
+    /**
+     * Callback invoked when the launch request for a StreamProcessor on the {@link SamzaResource} is successful.
+     * @param resource the resource on which the StreamProcessor is launched
+     */
+    void onStreamProcessorLaunchSuccess(SamzaResource resource);
+
+    /**
+     * Callback invoked when there is a failure in launching a StreamProcessor on the provided {@link SamzaResource}.
+     * @param resource the resource on which the StreamProcessor was submitted for launching
+     * @param t the error in launching the StreamProcessor
+     */
+    void onStreamProcessorLaunchFailure(SamzaResource resource, Throwable t);
+
     /***
      * This callback is invoked when there is an error in the ClusterResourceManager. This is
      * guaranteed to be invoked when there is an uncaught exception in any other

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 6a18b84..474ac8c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -105,6 +105,18 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
    */
   private final ContainerProcessManagerMetrics metrics;
 
+  //for testing
+  ContainerProcessManager(Config config, SamzaApplicationState state, MetricsRegistryMap registry, AbstractContainerAllocator allocator, ClusterResourceManager manager) {
+    this.state = state;
+    this.clusterManagerConfig = new ClusterManagerConfig(config);
+    this.jobConfig = new JobConfig(config);
+    this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
+    this.clusterResourceManager = manager;
+    this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
+    this.containerAllocator = allocator;
+    this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
+  }
+
   public ContainerProcessManager(Config config,
                                  SamzaApplicationState state,
                                  MetricsRegistryMap registry) {
@@ -153,7 +165,6 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
 
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
     log.info("finished initialization of samza task manager");
-
   }
 
   public boolean shouldShutdown() {
@@ -383,6 +394,50 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     }
   }
 
+  @Override
+  public void onStreamProcessorLaunchSuccess(SamzaResource resource) {
+
+    // 1. Obtain the Samza container Id for the pending container on this resource.
+    String containerId = getPendingContainerId(resource.getResourceID());
+    log.info("Successfully started container ID: {} on resource: {}", containerId, resource);
+
+    // 2. Remove the container from the pending buffer and add it to the running buffer. Additionally, update the
+    // job-health metric.
+    if (containerId != null) {
+      log.info("Moving containerID: {} on resource: {} from pending to running state", containerId, resource);
+      state.pendingContainers.remove(containerId);
+      state.runningContainers.put(containerId, resource);
+
+      if (state.neededContainers.decrementAndGet() == 0) {
+        state.jobHealthy.set(true);
+      }
+    } else {
+      log.warn("SamzaResource {} was not in pending state. Got an invalid callback for a launch request that " +
+          "was not issued", resource);
+    }
+  }
+
+  @Override
+  public void onStreamProcessorLaunchFailure(SamzaResource resource, Throwable t) {
+    log.error("Got a launch failure for SamzaResource {} with exception {}", resource, t);
+    // 1. Release resources for containers that failed back to YARN
+    log.info("Releasing unstartable container {}", resource.getResourceID());
+    clusterResourceManager.releaseResources(resource);
+
+    // 2. Obtain the Samza container Id for the pending container on this resource.
+    String containerId = getPendingContainerId(resource.getResourceID());
+    log.info("Failed container ID: {} for resourceId: {}", containerId, resource.getResourceID());
+
+    // 3. Re-request resources on ANY_HOST in case of launch failures on the preferred host.
+    if (containerId != null) {
+      log.info("Launch of container ID: {} failed on host: {}. Falling back to ANY_HOST", containerId, resource.getHost());
+      containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST);
+    } else {
+      log.warn("SamzaResource {} was not in pending state. Got an invalid callback for a launch request that was " +
+          "not issued", resource);
+    }
+  }
+
   /**
    * An error in the callback terminates the JobCoordinator
    * @param e the underlying exception/error
@@ -419,6 +474,21 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     return factory;
   }
 
+  /**
+   * Obtains the ID of the Samza container pending launch on the provided resource.
+   *
+   * @param resourceId the Id of the resource
+   * @return the Id of the Samza container on this resource
+   */
+  private String getPendingContainerId(String resourceId) {
+    for (Map.Entry<String, SamzaResource> entry: state.pendingContainers.entrySet()) {
+      if (entry.getValue().getResourceID().equals(resourceId)) {
+        log.info("Matching container ID found " + entry.getKey() + " " + entry.getValue());
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
 
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index adc6e51..0dcaace 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -99,7 +99,13 @@ public class SamzaApplicationState {
   public final AtomicInteger neededContainers = new AtomicInteger(0);
 
   /**
-   *  Map of the samzaContainerId to the {@link SamzaResource} on which it is running
+   *  Map of the samzaContainerId to the {@link SamzaResource} on which it is submitted for launch.
+   *  Modified by both the NMCallback and the ContainerAllocator thread.
+   */
+  public final ConcurrentMap<String, SamzaResource> pendingContainers = new ConcurrentHashMap<String, SamzaResource>(0);
+
+  /**
+   *  Map of the samzaContainerId to the {@link SamzaResource} on which it is running.
    *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
    */
   public final ConcurrentMap<String, SamzaResource> runningContainers = new ConcurrentHashMap<String, SamzaResource>(0);

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
index ba6ca2c..2927491 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
@@ -60,6 +60,14 @@ public class SamzaResource {
     return result;
   }
 
+  @Override
+  public String toString() {
+    return "SamzaResource{" +
+        "host='" + host + '\'' +
+        ", resourceID='" + resourceID + '\'' +
+        '}';
+  }
+
   public int getNumCores() {
     return numCores;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 862e5f9..9b2b4cf 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -63,7 +63,7 @@ public abstract class OperatorImpl<M, RM> {
   private Counter numMessage;
   private Timer handleMessageNs;
   private Timer handleTimerNs;
-  private long inputWatermark = WatermarkStates.WATERMARK_NOT_EXIST;
+  private long currentWatermark = WatermarkStates.WATERMARK_NOT_EXIST;
   private long outputWatermark = WatermarkStates.WATERMARK_NOT_EXIST;
   private TaskName taskName;
   // Although the operator node is in the operator graph, the current task may not consume any message in it.
@@ -340,22 +340,22 @@ public abstract class OperatorImpl<M, RM> {
       inputWatermarkMin = prevOperators.stream().map(op -> op.getOutputWatermark()).min(Long::compare).get();
     }
 
-    if (inputWatermark < inputWatermarkMin) {
+    if (currentWatermark < inputWatermarkMin) {
       // advance the watermark time of this operator
-      inputWatermark = inputWatermarkMin;
-      LOG.trace("Advance input watermark to {} in operator {}", inputWatermark, getOpImplId());
+      currentWatermark = inputWatermarkMin;
+      LOG.trace("Advance input watermark to {} in operator {}", currentWatermark, getOpImplId());
 
       final Long outputWm;
       final Collection<RM> output;
       final WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
       if (watermarkFn != null) {
         // user-overrided watermark handling here
-        output = (Collection<RM>) watermarkFn.processWatermark(inputWatermark);
+        output = (Collection<RM>) watermarkFn.processWatermark(currentWatermark);
         outputWm = watermarkFn.getOutputWatermark();
       } else {
         // use samza-provided watermark handling
         // default is to propagate the input watermark
-        output = handleWatermark(inputWatermark, collector, coordinator);
+        output = handleWatermark(currentWatermark, collector, coordinator);
         outputWm = getOutputWatermark();
       }
 
@@ -398,7 +398,7 @@ public abstract class OperatorImpl<M, RM> {
 
   /* package private for testing */
   final long getInputWatermark() {
-    return this.inputWatermark;
+    return this.currentWatermark;
   }
 
   /**
@@ -409,7 +409,7 @@ public abstract class OperatorImpl<M, RM> {
   protected long getOutputWatermark() {
     if (usedInCurrentTask) {
       // default as input
-      return getInputWatermark();
+      return this.currentWatermark;
     } else {
       // always emit the max to indicate no input will be emitted afterwards
       return Long.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index 0d13fb1..452cadc 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -19,10 +19,10 @@
 
 package org.apache.samza.clustermanager;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.samza.job.CommandBuilder;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -47,9 +47,11 @@ public class MockClusterResourceManager extends ClusterResourceManager {
 
   @Override
   public void requestResources(SamzaResourceRequest resourceRequest) {
-    SamzaResource resource = new SamzaResource(resourceRequest.getNumCores(), resourceRequest.getMemoryMB(), resourceRequest.getPreferredHost(), UUID.randomUUID().toString());
-    List<SamzaResource> resources = Collections.singletonList(resource);
-    resourceRequests.addAll(resources);
+    SamzaResource resource = new SamzaResource(resourceRequest.getNumCores(), resourceRequest.getMemoryMB(),
+        resourceRequest.getPreferredHost(), UUID.randomUUID().toString());
+    resourceRequests.add(resource);
+
+    clusterManagerCallback.onResourcesAvailable(ImmutableList.of(resource));
   }
 
   @Override
@@ -63,15 +65,16 @@ public class MockClusterResourceManager extends ClusterResourceManager {
   }
 
   @Override
-  public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException {
+  public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder)  {
     if (nextException != null) {
-      throw new SamzaContainerLaunchException(nextException);
+      clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(nextException));
+    } else {
+      launchedResources.add(resource);
+      clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
     }
-    launchedResources.add(resource);
     for (MockContainerListener listener : mockContainerListeners) {
       listener.postRunContainer(launchedResources.size());
     }
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
index 5079625..4e6e2c9 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
@@ -38,6 +38,16 @@ public class MockClusterResourceManagerCallback implements ClusterResourceManage
   }
 
   @Override
+  public void onStreamProcessorLaunchSuccess(SamzaResource resource) {
+    // no op
+  }
+
+  @Override
+  public void onStreamProcessorLaunchFailure(SamzaResource resource, Throwable t) {
+    // no op
+  }
+
+  @Override
   public void onError(Throwable e) {
     error = e;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index 734043a..4596673 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -30,7 +30,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
@@ -202,58 +201,4 @@ public class TestContainerAllocator {
     listener.verify();
   }
 
-
-  /**
-   * If the container fails to start e.g because it fails to connect to a NM on a host that
-   * is down, the allocator should request a new container on a different host.
-   */
-  @Test
-  public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
-    final SamzaResource container = new SamzaResource(1, 1024, "2", "id0");
-    final SamzaResource container1 = new SamzaResource(1, 1024, "2", "id0");
-    manager.nextException = new IOException("Cant connect to RM");
-
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, null, new Runnable() {
-      @Override
-      public void run() {
-        // The failed container should be released. The successful one should not.
-        assertNotNull(manager.releasedResources);
-        assertEquals(1, manager.releasedResources.size());
-        assertTrue(manager.releasedResources.contains(container));
-      }
-    },
-        new Runnable() {
-          @Override
-          public void run() {
-            // Test that the first request assignment had a preferred host and the retry didn't
-            assertEquals(2, requestState.assignedRequests.size());
-
-            SamzaResourceRequest request = requestState.assignedRequests.remove();
-            assertEquals("0", request.getContainerID());
-            assertEquals("2", request.getPreferredHost());
-
-            request = requestState.assignedRequests.remove();
-            assertEquals("0", request.getContainerID());
-            assertEquals("ANY_HOST", request.getPreferredHost());
-
-            // This routine should be called after the retry is assigned, but before it's started.
-            // So there should still be 1 container needed.
-            assertEquals(1, state.neededContainers.get());
-          }
-        }, null
-    );
-    state.neededContainers.set(1);
-    requestState.registerContainerListener(listener);
-
-    containerAllocator.requestResource("0", "2");
-    containerAllocator.addResource(container);
-    containerAllocator.addResource(container1);
-    allocatorThread.start();
-
-    listener.verify();
-
-  }
-
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index e252b7d..5c2fe4a 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -48,7 +48,7 @@ import static org.mockito.Mockito.when;
 
 public class TestContainerProcessManager {
   private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
-  private final MockClusterResourceManager manager = new MockClusterResourceManager(callback);
+  private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback);
 
   private static volatile boolean isRunning = false;
 
@@ -125,7 +125,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        manager
+        clusterResourceManager
     );
 
     AbstractContainerAllocator allocator =
@@ -145,7 +145,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        manager
+        clusterResourceManager
     );
 
     allocator =
@@ -165,11 +165,11 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        manager
+        clusterResourceManager
     );
 
     MockContainerAllocator allocator = new MockContainerAllocator(
-        manager,
+        clusterResourceManager,
         conf,
         state);
 
@@ -207,7 +207,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        manager
+        clusterResourceManager
     );
     taskManager.start();
 
@@ -231,11 +231,11 @@ public class TestContainerProcessManager {
             new MapConfig(conf),
             state,
             new MetricsRegistryMap(),
-            manager
+        clusterResourceManager
     );
 
     MockContainerAllocator allocator = new MockContainerAllocator(
-            manager,
+        clusterResourceManager,
             conf,
             state);
 
@@ -258,6 +258,7 @@ public class TestContainerProcessManager {
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
+    taskManager.onStreamProcessorLaunchSuccess(container);
     assertFalse(taskManager.shouldShutdown());
 
     taskManager.onResourceCompleted(new SamzaResourceStatus("id0", "diagnostics", SamzaResourceStatus.SUCCESS));
@@ -278,11 +279,11 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        manager
+        clusterResourceManager
     );
 
     MockContainerAllocator allocator = new MockContainerAllocator(
-        manager,
+        clusterResourceManager,
         conf,
         state);
 
@@ -305,7 +306,7 @@ public class TestContainerProcessManager {
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
-
+    taskManager.onStreamProcessorLaunchSuccess(container);
     // Create first container failure
     taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "diagnostics", 1));
 
@@ -316,8 +317,8 @@ public class TestContainerProcessManager {
 
     assertFalse(taskManager.shouldShutdown());
     assertFalse(state.jobHealthy.get());
-    assertEquals(2, manager.resourceRequests.size());
-    assertEquals(0, manager.releasedResources.size());
+    assertEquals(2, clusterResourceManager.resourceRequests.size());
+    assertEquals(0, clusterResourceManager.releasedResources.size());
 
     taskManager.onResourceAllocated(container);
 
@@ -325,7 +326,7 @@ public class TestContainerProcessManager {
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
-
+    taskManager.onStreamProcessorLaunchSuccess(container);
     assertTrue(state.jobHealthy.get());
 
     // Create a second failure
@@ -334,8 +335,8 @@ public class TestContainerProcessManager {
 
     // The above failure should trigger a job shutdown because our retry count is set to 1
     assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
-    assertEquals(2, manager.resourceRequests.size());
-    assertEquals(0, manager.releasedResources.size());
+    assertEquals(2, clusterResourceManager.resourceRequests.size());
+    assertEquals(0, clusterResourceManager.releasedResources.size());
     assertFalse(state.jobHealthy.get());
     assertTrue(taskManager.shouldShutdown());
     assertEquals(SamzaApplicationState.SamzaAppStatus.FAILED, state.status);
@@ -355,11 +356,11 @@ public class TestContainerProcessManager {
             new MapConfig(conf),
             state,
             new MetricsRegistryMap(),
-            manager
+        clusterResourceManager
     );
 
     MockContainerAllocator allocator = new MockContainerAllocator(
-            manager,
+        clusterResourceManager,
             conf,
             state);
     getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
@@ -367,7 +368,7 @@ public class TestContainerProcessManager {
     Thread thread = new Thread(allocator);
     getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
 
-    // Start the task manager
+    // Start the task clusterResourceManager
     taskManager.start();
 
     SamzaResource container = new SamzaResource(1, 1000, "abc", "id1");
@@ -389,6 +390,29 @@ public class TestContainerProcessManager {
   }
 
   @Test
+  public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
+    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(1));
+    Map<String, String> configMap = new HashMap<>();
+    configMap.putAll(getConfig());
+
+    MockContainerAllocator allocator = new MockContainerAllocator(
+        clusterResourceManager,
+        new MapConfig(config),
+        state);
+
+    ContainerProcessManager manager = new ContainerProcessManager(config, state, new MetricsRegistryMap(), allocator,
+        clusterResourceManager);
+
+    manager.start();
+    SamzaResource resource = new SamzaResource(1, 1024, "abc", "resource-1");
+    state.pendingContainers.put("1", resource);
+    Assert.assertEquals(clusterResourceManager.resourceRequests.size(), 1);
+    manager.onStreamProcessorLaunchFailure(resource, new Exception("cannot launch container!"));
+    Assert.assertEquals(clusterResourceManager.resourceRequests.size(), 2);
+    Assert.assertEquals(clusterResourceManager.resourceRequests.get(1).getHost(), ResourceRequestState.ANY_HOST);
+  }
+
+  @Test
   public void testDuplicateNotificationsDoNotAffectJobHealth() throws Exception {
     Config conf = getConfig();
 
@@ -400,11 +424,11 @@ public class TestContainerProcessManager {
             new MapConfig(conf),
             state,
             new MetricsRegistryMap(),
-            manager
+        clusterResourceManager
     );
 
     MockContainerAllocator allocator = new MockContainerAllocator(
-            manager,
+        clusterResourceManager,
             conf,
             state);
     getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
@@ -424,6 +448,7 @@ public class TestContainerProcessManager {
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
+    taskManager.onStreamProcessorLaunchSuccess(container1);
     assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
 
     // Create container failure - with ContainerExitStatus.DISKS_FAILED
@@ -433,8 +458,8 @@ public class TestContainerProcessManager {
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
     assertFalse(taskManager.shouldShutdown());
     assertFalse(state.jobHealthy.get());
-    assertEquals(2, manager.resourceRequests.size());
-    assertEquals(0, manager.releasedResources.size());
+    assertEquals(2, clusterResourceManager.resourceRequests.size());
+    assertEquals(0, clusterResourceManager.releasedResources.size());
     assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
 
     SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2");
@@ -444,14 +469,16 @@ public class TestContainerProcessManager {
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
+    taskManager.onStreamProcessorLaunchSuccess(container2);
+
     assertTrue(state.jobHealthy.get());
 
     // Simulate a duplicate notification for container 1 with a different exit code
     taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.PREEMPTED));
     // assert that a duplicate notification does not change metrics (including job health)
     assertEquals(state.redundantNotifications.get(), 1);
-    assertEquals(2, manager.resourceRequests.size());
-    assertEquals(0, manager.releasedResources.size());
+    assertEquals(2, clusterResourceManager.resourceRequests.size());
+    assertEquals(0, clusterResourceManager.releasedResources.size());
     assertTrue(state.jobHealthy.get());
   }
 
@@ -471,11 +498,11 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        manager
+        clusterResourceManager
     );
 
     MockContainerAllocator allocator = new MockContainerAllocator(
-        manager,
+        clusterResourceManager,
         conf,
         state);
     getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
@@ -483,7 +510,7 @@ public class TestContainerProcessManager {
     Thread thread = new Thread(allocator);
     getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
 
-    // Start the task manager
+    // Start the task clusterResourceManager
     taskManager.start();
     assertFalse(taskManager.shouldShutdown());
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
@@ -496,7 +523,7 @@ public class TestContainerProcessManager {
       fail("timed out waiting for the containers to start");
     }
     assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
-
+    taskManager.onStreamProcessorLaunchSuccess(container1);
     // Create container failure - with ContainerExitStatus.DISKS_FAILED
     taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL));
 
@@ -504,8 +531,8 @@ public class TestContainerProcessManager {
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
     assertFalse(taskManager.shouldShutdown());
     assertFalse(state.jobHealthy.get());
-    assertEquals(2, manager.resourceRequests.size());
-    assertEquals(0, manager.releasedResources.size());
+    assertEquals(2, clusterResourceManager.resourceRequests.size());
+    assertEquals(0, clusterResourceManager.releasedResources.size());
     assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
 
     SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2");
@@ -515,10 +542,11 @@ public class TestContainerProcessManager {
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
+    taskManager.onStreamProcessorLaunchSuccess(container2);
 
     // Create container failure - with ContainerExitStatus.PREEMPTED
     taskManager.onResourceCompleted(new SamzaResourceStatus(container2.getResourceID(), "Preemption",  SamzaResourceStatus.PREEMPTED));
-    assertEquals(3, manager.resourceRequests.size());
+    assertEquals(3, clusterResourceManager.resourceRequests.size());
 
     // The above failure should trigger a container request
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
@@ -532,14 +560,15 @@ public class TestContainerProcessManager {
     if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
       fail("timed out waiting for the containers to start");
     }
+    taskManager.onStreamProcessorLaunchSuccess(container3);
 
     // Create container failure - with ContainerExitStatus.ABORTED
     taskManager.onResourceCompleted(new SamzaResourceStatus(container3.getResourceID(), "Aborted", SamzaResourceStatus.ABORTED));
 
     // The above failure should trigger a container request
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
-    assertEquals(4, manager.resourceRequests.size());
-    assertEquals(0, manager.releasedResources.size());
+    assertEquals(4, clusterResourceManager.resourceRequests.size());
+    assertEquals(0, clusterResourceManager.releasedResources.size());
     assertFalse(taskManager.shouldShutdown());
     assertFalse(state.jobHealthy.get());
     assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
@@ -556,7 +585,7 @@ public class TestContainerProcessManager {
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
-        manager
+        clusterResourceManager
     );
     taskManager.start();
     SamzaResource container2 = new SamzaResource(1, 1024, "", "id0");
@@ -570,7 +599,7 @@ public class TestContainerProcessManager {
         new MapConfig(config),
         state,
         new MetricsRegistryMap(),
-        manager
+        clusterResourceManager
     );
     taskManager1.start();
     taskManager1.onResourceAllocated(container2);

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index 00198e9..6260b71 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.clustermanager;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
@@ -204,61 +203,6 @@ public class TestHostAwareContainerAllocator {
   }
 
   /**
-   * If the container fails to start e.g because it fails to connect to a NM on a host that
-   * is down, the allocator should request a new container on a different host.
-   */
-  @Test
-  public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
-
-    final SamzaResource container = new SamzaResource(1, 1024, "2", "id0");
-    final SamzaResource container1 = new SamzaResource(1, 1024, "1", "id1");
-    manager.nextException = new IOException("Cant connect to RM");
-
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, null, new Runnable() {
-      @Override
-      public void run() {
-        // The failed container should be released. The successful one should not.
-        assertNotNull(manager.releasedResources);
-        assertEquals(1, manager.releasedResources.size());
-        assertTrue(manager.releasedResources.contains(container));
-      }
-    },
-        new Runnable() {
-          @Override
-          public void run() {
-            // Test that the first request assignment had a preferred host and the retry didn't
-            assertEquals(2, requestState.assignedRequests.size());
-
-            SamzaResourceRequest request = requestState.assignedRequests.remove();
-            assertEquals("0", request.getContainerID());
-            assertEquals("2", request.getPreferredHost());
-
-            request = requestState.assignedRequests.remove();
-            assertEquals("0", request.getContainerID());
-            assertEquals("ANY_HOST", request.getPreferredHost());
-
-            // This routine should be called after the retry is assigned, but before it's started.
-            // So there should still be 1 container needed.
-            assertEquals(1, state.neededContainers.get());
-          }
-        }, null
-    );
-    state.neededContainers.set(1);
-    requestState.registerContainerListener(listener);
-
-    // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)
-    containerAllocator.requestResource("0", "2");
-    containerAllocator.addResource(container1);
-    containerAllocator.addResource(container);
-
-    allocatorThread.start();
-
-    listener.verify();
-  }
-
-
-  /**
    * Handles expired requests correctly and assigns ANY_HOST
    */
 

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 904367b..a4f0354 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -21,6 +21,7 @@ package org.apache.samza.operators.impl;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.Counter;

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
index db67de6..fb9a9c2 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
@@ -49,7 +49,9 @@ public class YarnAppState {
   * Modified by both the AMRMCallbackThread and the ContainerAllocator thread
   */
 
-  public Map<String, YarnContainer> runningYarnContainers = new ConcurrentHashMap<String, YarnContainer>()  ;
+  public Map<String, YarnContainer> runningYarnContainers = new ConcurrentHashMap<String, YarnContainer>();
+
+  public Map<String, YarnContainer> pendingYarnContainers = new ConcurrentHashMap<String, YarnContainer>();
 
   public ConcurrentMap<String, ContainerStatus> failedContainersStatus = new ConcurrentHashMap<String, ContainerStatus>();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 96a4488..9be8475 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -19,30 +19,46 @@
 
 package org.apache.samza.job.yarn;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.*;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.samza.SamzaException;
 import org.apache.samza.clustermanager.*;
 import org.apache.samza.clustermanager.SamzaApplicationState;
 import org.apache.samza.clustermanager.SamzaContainerLaunchException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.YarnConfig;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.CommandBuilder;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.util.Util;
 import org.apache.samza.util.hadoop.HttpFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -61,7 +77,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  *
  */
 
-public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler {
+public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
 
   private final String INVALID_YARN_CONTAINER_ID = "-1";
 
@@ -71,14 +87,9 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
   private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
 
   /**
-   * A helper class to launch Yarn containers.
-   */
-  private final YarnContainerRunner yarnContainerRunner;
-
-  /**
    * Configuration and state specific to Yarn.
    */
-  private final YarnConfiguration hConfig;
+  private final YarnConfiguration yarnConfiguration;
   private final YarnAppState state;
 
   /**
@@ -100,12 +111,16 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
   private final ConcurrentHashMap<SamzaResource, Container> allocatedResources = new ConcurrentHashMap<>();
   private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap<>();
 
+  private final ConcurrentHashMap<ContainerId, Container> containersPendingStartup = new ConcurrentHashMap<>();
+
   private final SamzaAppMasterMetrics metrics;
 
   final AtomicBoolean started = new AtomicBoolean(false);
   private final Object lock = new Object();
+  private final NMClientAsync nmClientAsync;
 
   private static final Logger log = LoggerFactory.getLogger(YarnClusterResourceManager.class);
+  private final Config config;
 
   /**
    * Creates an YarnClusterResourceManager from config, a jobModelReader and a callback.
@@ -116,15 +131,15 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
    */
   public YarnClusterResourceManager(Config config, JobModelManager jobModelManager, ClusterResourceManager.Callback callback, SamzaApplicationState samzaAppState ) {
     super(callback);
-    hConfig = new YarnConfiguration();
-    hConfig.set("fs.http.impl", HttpFileSystem.class.getName());
+    yarnConfiguration = new YarnConfiguration();
+    yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
 
     // Use the Samza job config "fs.<scheme>.impl" and "fs.<scheme>.impl.*" for YarnConfiguration
     FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config);
     fsImplConfig.getSchemes().forEach(
         scheme -> {
           fsImplConfig.getSchemeConfig(scheme).forEach(
-              (confKey, confValue) -> hConfig.set(confKey, confValue)
+              (confKey, confValue) -> yarnConfiguration.set(confKey, confValue)
           );
         }
     );
@@ -143,6 +158,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     int nodeHttpPort = Integer.parseInt(nodeHttpPortString);
     YarnConfig yarnConfig = new YarnConfig(config);
     this.yarnConfig = yarnConfig;
+    this.config = config;
     int interval = yarnConfig.getAMPollIntervalMs();
 
     //Instantiate the AM Client.
@@ -151,7 +167,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     this.state = new YarnAppState(-1, containerId, nodeHostString, nodePort, nodeHttpPort);
 
     log.info("Initialized YarnAppState: {}", state.toString());
-    this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, hConfig);
+    this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, yarnConfiguration);
 
     log.info("ContainerID str {}, Nodehost  {} , Nodeport  {} , NodeHttpport {}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort});
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
@@ -162,8 +178,8 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
         state,
         amClient
     );
+    this.nmClientAsync = NMClientAsync.createNMClientAsync(this);
 
-    yarnContainerRunner = new YarnContainerRunner(config, hConfig);
   }
 
   /**
@@ -179,8 +195,10 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     metrics.start();
     service.onInit();
     log.info("Starting YarnContainerManager.");
-    amClient.init(hConfig);
+    amClient.init(yarnConfiguration);
     amClient.start();
+    nmClientAsync.init(yarnConfiguration);
+    nmClientAsync.start();
     lifecycle.onInit();
 
     if(lifecycle.shouldShutdown()) {
@@ -262,19 +280,22 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
    */
 
   @Override
-  public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException {
+  public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
     String containerIDStr = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
-    log.info("Received launch request for {} on hostname {}", containerIDStr , resource.getHost());
-
+    log.info("Received launch request for {} on hostname {}", containerIDStr, resource.getHost());
     synchronized (lock) {
-      Container container = allocatedResources.get(resource);
-      if (container == null) {
-        log.info("Resource {} already released. ", resource);
-        return;
-      }
+      try {
+        Container container = allocatedResources.get(resource);
+        if (container == null) {
+          log.info("Resource {} already released. ", resource);
+          return;
+        }
 
-      state.runningYarnContainers.put(containerIDStr, new YarnContainer(container));
-      yarnContainerRunner.runContainer(containerIDStr, container, builder);
+        runContainer(containerIDStr, container, builder);
+      } catch (Throwable t) {
+        log.error("Error in launching stream processor:", t);
+        clusterManagerCallback.onStreamProcessorLaunchFailure(resource, t);
+      }
     }
   }
 
@@ -338,6 +359,8 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     lifecycle.onShutdown(status);
     amClient.stop();
     log.info("Stopping the AM service " );
+    nmClientAsync.stop();
+    log.info("Stopping the NM service " );
     service.onShutdown();
     metrics.stop();
 
@@ -358,7 +381,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
 
       FileSystem fs = null;
       try {
-        fs = FileSystem.get(hConfig);
+        fs = FileSystem.get(yarnConfiguration);
       } catch (IOException e) {
         log.error("Unable to clean up file system: {}", e);
         return;
@@ -454,4 +477,243 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     clusterManagerCallback.onError(e);
   }
 
+  @Override
+  public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
+    log.info("Received a containerStart notification from the NodeManager for container: {} ", containerId);
+    String samzaContainerId = getPendingSamzaContainerId(containerId);
+
+    if (samzaContainerId != null) {
+      // 1. Move the container from pending to running state
+      final YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId);
+      log.info("Samza containerId:{} has started", samzaContainerId);
+
+      state.runningYarnContainers.put(samzaContainerId, container);
+
+      // 2. Invoke the success callback.
+      SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
+          container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
+      clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
+    } else {
+      log.info("Got an invalid notification from YARN for container: {}", containerId);
+    }
+  }
+
+  @Override
+  public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
+    log.info("Got a status from the NodeManager. Container: {} Status: {}", containerId, containerStatus.getState());
+  }
+
+  @Override
+  public void onContainerStopped(ContainerId containerId) {
+    log.info("Got a notification from the NodeManager for a stopped container. ContainerId: {}", containerId);
+  }
+
+  @Override
+  public void onStartContainerError(ContainerId containerId, Throwable t) {
+    log.error(String.format("Container: %s could not start.", containerId), t);
+
+    Container container = containersPendingStartup.remove(containerId);
+
+    if (container != null) {
+      SamzaResource resource = new SamzaResource(container.getResource().getVirtualCores(),
+          container.getResource().getMemory(), container.getNodeId().getHost(), containerId.toString());
+      log.info("Invoking failure callback for container: {}", containerId);
+      clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(t));
+    } else {
+      log.info("Got an invalid notification for container: {}", containerId);
+    }
+  }
+
+  @Override
+  public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
+    log.info("Got an error on getContainerStatus from the NodeManager. ContainerId: {}. Error: {}", containerId, t);
+  }
+
+  @Override
+  public void onStopContainerError(ContainerId containerId, Throwable t) {
+    log.info("Got an error when stopping container from the NodeManager. ContainerId: {}. Error: {}", containerId, t);
+  }
+
+  /**
+   * Runs a process as specified by the command builder on the container.
+   * @param samzaContainerId id of the samza Container to run (passed as a command line parameter to the process)
+   * @param container the samza container to run.
+   * @param cmdBuilder the command builder that encapsulates the command, and the context
+   *
+   */
+  public void runContainer(String samzaContainerId, Container container, CommandBuilder cmdBuilder) throws IOException {
+    String containerIdStr = ConverterUtils.toString(container.getId());
+    log.info("Got available container ID ({}) for container: {}", samzaContainerId, container);
+
+    // check if we have framework path specified. If yes - use it, if not use default ./__package/
+    String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries
+    String cmdPath = "./__package/";
+
+    String fwkPath = JobConfig.getFwkPath(this.config);
+    if(fwkPath != null && (! fwkPath.isEmpty())) {
+      cmdPath = fwkPath;
+      jobLib = "export JOB_LIB_DIR=./__package/lib";
+    }
+    log.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + cmdPath + ";jobLib=" + jobLib);
+    cmdBuilder.setCommandPath(cmdPath);
+
+
+    String command = cmdBuilder.buildCommand();
+    log.info("Container ID {} using command {}", samzaContainerId, command);
+
+    Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
+    env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
+    printContainerEnvironmentVariables(samzaContainerId, env);
+
+    log.info("Samza FWK path: " + command + "; env=" + env);
+
+    Path packagePath = new Path(yarnConfig.getPackagePath());
+    log.info("Starting container ID {} using package path {}", samzaContainerId, packagePath);
+    state.pendingYarnContainers.put(samzaContainerId, new YarnContainer(container));
+
+    startContainer(
+        packagePath,
+        container,
+        env,
+        getFormattedCommand(
+            ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+            jobLib,
+            command,
+            ApplicationConstants.STDOUT,
+            ApplicationConstants.STDERR)
+    );
+
+
+    log.info("Claimed container ID {} for container {} on node {} (http://{}/node/containerlogs/{}).",
+        new Object[]{
+            samzaContainerId,
+            containerIdStr,
+            container.getNodeId().getHost(),
+            container.getNodeHttpAddress(),
+            containerIdStr}
+    );
+
+    log.info("Started container ID {}", samzaContainerId);
+  }
+
+  /**
+   *    Runs a command as a process on the container. All binaries needed by the physical process are packaged in the URL
+   *    specified by packagePath.
+   */
+  private void startContainer(Path packagePath,
+                              Container container,
+                              Map<String, String> env,
+                              final String cmd) throws IOException {
+    log.info("Starting container {} {} {} {}",
+        new Object[]{packagePath, container, env, cmd});
+
+    LocalResource packageResource = Records.newRecord(LocalResource.class);
+    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
+    FileStatus fileStatus;
+    fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
+    packageResource.setResource(packageUrl);
+    log.info("Set package resource in YarnContainerRunner for {}", packageUrl);
+    packageResource.setSize(fileStatus.getLen());
+    packageResource.setTimestamp(fileStatus.getModificationTime());
+    packageResource.setType(LocalResourceType.ARCHIVE);
+    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+    ByteBuffer allTokens;
+    // copy tokens to start the container
+    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+
+    // now remove the AM->RM token so that containers cannot access it
+    Iterator iter = credentials.getAllTokens().iterator();
+    while (iter.hasNext()) {
+      TokenIdentifier token = ((org.apache.hadoop.security.token.Token) iter.next()).decodeIdentifier();
+      if (token != null && token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        iter.remove();
+      }
+    }
+    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    Map<String, LocalResource> localResourceMap = new HashMap<>();
+    localResourceMap.put("__package", packageResource);
+
+    // include the resources from the universal resource configurations
+    LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), yarnConfiguration);
+    localResourceMap.putAll(resourceMapper.getResourceMap());
+
+    ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
+    context.setEnvironment(env);
+    context.setTokens(allTokens.duplicate());
+    context.setCommands(new ArrayList<String>() {{add(cmd);}});
+    context.setLocalResources(localResourceMap);
+
+    log.debug("Setting localResourceMap to {}", localResourceMap);
+    log.debug("Setting context to {}", context);
+
+    StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
+    startContainerRequest.setContainerLaunchContext(context);
+
+    log.info("Making an async start request for container {}", container);
+    nmClientAsync.startContainerAsync(container, context);
+  }
+
+  /**
+   * @param samzaContainerId  the Samza container Id for logging purposes.
+   * @param env               the Map of environment variables to their respective values.
+   */
+  private void printContainerEnvironmentVariables(String samzaContainerId, Map<String, String> env) {
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<String, String> entry : env.entrySet()) {
+      sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));
+    }
+    log.info("Container ID {} using environment variables: {}", samzaContainerId, sb.toString());
+  }
+
+
+  /**
+   * Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters.
+   *
+   * @param cmdBuilder        the command builder containing the environment variables.
+   * @return                  the map containing the escaped environment variables.
+   */
+  private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmdBuilder) {
+    Map<String, String> env = new HashMap<String, String>();
+    for (Map.Entry<String, String> entry : cmdBuilder.buildEnvironment().entrySet()) {
+      String escapedValue = Util.envVarEscape(entry.getValue());
+      env.put(entry.getKey(), escapedValue);
+    }
+    return env;
+  }
+
+
+  private String getFormattedCommand(String logDirExpansionVar,
+                                     String jobLib,
+                                     String command,
+                                     String stdOut,
+                                     String stdErr) {
+    if (!jobLib.isEmpty()) {
+      jobLib = "&& " + jobLib; // add job's libraries exported to an env variable
+    }
+
+    return String
+        .format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar,
+            jobLib, logDirExpansionVar, command, stdOut, stdErr);
+  }
+
+  /**
+   * Returns the Id of the Samza container that corresponds to the provided Yarn {@link ContainerId}
+   * @param containerId the Yarn ContainerId
+   * @return the id of the Samza container corresponding to the {@link ContainerId} that is pending launch
+   */
+  private String getPendingSamzaContainerId(ContainerId containerId) {
+    for (String samzaContainerId: state.pendingYarnContainers.keySet()) {
+      YarnContainer yarnContainer = state.pendingYarnContainers.get(samzaContainerId);
+      if (yarnContainer.id().equals(containerId)) {
+        return samzaContainerId;
+      }
+    }
+    return null;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
deleted file mode 100644
index cdcf2d1..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.samza.clustermanager.SamzaContainerLaunchException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.job.CommandBuilder;
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * A Helper class to run container processes on Yarn. This encapsulates quite a bit of YarnContainer
- * boiler plate.
- */
-public class YarnContainerRunner {
-  private static final Logger log = LoggerFactory.getLogger(YarnContainerRunner.class);
-
-  private final Config config;
-  private final YarnConfiguration yarnConfiguration;
-
-  private final NMClient nmClient;
-  private final YarnConfig yarnConfig;
-
-  /**
-   * Create a new Runner from a Config.
-   * @param config to instantiate the runner with
-   * @param yarnConfiguration the yarn config for the cluster to connect to.
-   */
-
-  public YarnContainerRunner(Config config,
-                             YarnConfiguration yarnConfiguration) {
-    this.config = config;
-    this.yarnConfiguration = yarnConfiguration;
-
-    this.nmClient = NMClient.createNMClient();
-    nmClient.init(this.yarnConfiguration);
-
-    this.yarnConfig = new YarnConfig(config);
-  }
-
-  /**
-   * Runs a process as specified by the command builder on the container.
-   * @param samzaContainerId id of the samza Container to run (passed as a command line parameter to the process)
-   * @param container the samza container to run.
-   * @param cmdBuilder the command builder that encapsulates the command, and the context
-   *
-   * @throws SamzaContainerLaunchException  when there's an exception in submitting the request to the RM.
-   *
-   */
-  public void runContainer(String samzaContainerId, Container container, CommandBuilder cmdBuilder) throws SamzaContainerLaunchException {
-    String containerIdStr = ConverterUtils.toString(container.getId());
-    log.info("Got available container ID ({}) for container: {}", samzaContainerId, container);
-
-    // check if we have framework path specified. If yes - use it, if not use default ./__package/
-    String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries
-    String cmdPath = "./__package/";
-
-    String fwkPath = JobConfig.getFwkPath(config);
-    if(fwkPath != null && (! fwkPath.isEmpty())) {
-      cmdPath = fwkPath;
-      jobLib = "export JOB_LIB_DIR=./__package/lib";
-    }
-    log.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + cmdPath + ";jobLib=" + jobLib);
-    cmdBuilder.setCommandPath(cmdPath);
-
-
-    String command = cmdBuilder.buildCommand();
-    log.info("Container ID {} using command {}", samzaContainerId, command);
-
-    Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
-    env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
-    printContainerEnvironmentVariables(samzaContainerId, env);
-
-    log.info("Samza FWK path: " + command + "; env=" + env);
-
-    Path packagePath = new Path(yarnConfig.getPackagePath());
-    log.info("Starting container ID {} using package path {}", samzaContainerId, packagePath);
-
-    startContainer(
-        packagePath,
-        container,
-        env,
-        getFormattedCommand(
-            ApplicationConstants.LOG_DIR_EXPANSION_VAR,
-            jobLib,
-            command,
-            ApplicationConstants.STDOUT,
-            ApplicationConstants.STDERR)
-    );
-
-
-    log.info("Claimed container ID {} for container {} on node {} (http://{}/node/containerlogs/{}).",
-        new Object[]{
-            samzaContainerId,
-            containerIdStr,
-            container.getNodeId().getHost(),
-            container.getNodeHttpAddress(),
-            containerIdStr}
-    );
-
-      log.info("Started container ID {}", samzaContainerId);
-  }
-
-  /**
-   *    Runs a command as a process on the container. All binaries needed by the physical process are packaged in the URL
-   *    specified by packagePath.
-   */
-  private void startContainer(Path packagePath,
-                                Container container,
-                                Map<String, String> env,
-                                final String cmd) throws SamzaContainerLaunchException {
-    log.info("starting container {} {} {} {}",
-        new Object[]{packagePath, container, env, cmd});
-
-    // TODO: SAMZA-1144 remove the customized approach for package resource and use the common one.
-    // But keep it now for backward compatibility.
-    // set the local package so that the containers and app master are provisioned with it
-    LocalResource packageResource = Records.newRecord(LocalResource.class);
-    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
-    FileStatus fileStatus;
-    try {
-      fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
-    } catch (IOException ioe) {
-      log.error("IO Exception when accessing the package status from the filesystem", ioe);
-      throw new SamzaContainerLaunchException("IO Exception when accessing the package status from the filesystem");
-    }
-
-    packageResource.setResource(packageUrl);
-    log.info("set package Resource in YarnContainerRunner for {}", packageUrl);
-    packageResource.setSize(fileStatus.getLen());
-    packageResource.setTimestamp(fileStatus.getModificationTime());
-    packageResource.setType(LocalResourceType.ARCHIVE);
-    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
-
-    ByteBuffer allTokens;
-    // copy tokens (copied from dist shell example)
-    try {
-      Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-      DataOutputBuffer dob = new DataOutputBuffer();
-      credentials.writeTokenStorageToStream(dob);
-
-      // now remove the AM->RM token so that containers cannot access it
-      Iterator iter = credentials.getAllTokens().iterator();
-      while (iter.hasNext()) {
-        TokenIdentifier token = ((Token) iter.next()).decodeIdentifier();
-        if (token != null && token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
-          iter.remove();
-        }
-      }
-      allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-
-    } catch (IOException ioe) {
-      log.error("IOException when writing credentials.", ioe);
-      throw new SamzaContainerLaunchException("IO Exception when writing credentials to output buffer");
-    }
-
-    Map<String, LocalResource> localResourceMap = new HashMap<>();
-    localResourceMap.put("__package", packageResource);
-
-    // include the resources from the universal resource configurations
-    LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), yarnConfiguration);
-    localResourceMap.putAll(resourceMapper.getResourceMap());
-
-    ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
-    context.setEnvironment(env);
-    context.setTokens(allTokens.duplicate());
-    context.setCommands(new ArrayList<String>() {{add(cmd);}});
-    context.setLocalResources(localResourceMap);
-
-    log.debug("setting localResourceMap to {}", localResourceMap);
-    log.debug("setting context to {}", context);
-
-    StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
-    startContainerRequest.setContainerLaunchContext(context);
-    try {
-      nmClient.startContainer(container, context);
-    } catch (YarnException ye) {
-      log.error("Received YarnException when starting container: " + container.getId(), ye);
-      throw new SamzaContainerLaunchException("Received YarnException when starting container: " + container.getId(), ye);
-    } catch (IOException ioe) {
-      log.error("Received IOException when starting container: " + container.getId(), ioe);
-      throw new SamzaContainerLaunchException("Received IOException when starting container: " + container.getId(), ioe);
-    }
-  }
-
-
-  /**
-   * @param samzaContainerId  the Samza container Id for logging purposes.
-   * @param env               the Map of environment variables to their respective values.
-   */
-  private void printContainerEnvironmentVariables(String samzaContainerId, Map<String, String> env) {
-    StringBuilder sb = new StringBuilder();
-    for (Map.Entry<String, String> entry : env.entrySet()) {
-      sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));
-    }
-    log.info("Container ID {} using environment variables: {}", samzaContainerId, sb.toString());
-  }
-
-
-  /**
-   * Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters.
-   *
-   * @param cmdBuilder        the command builder containing the environment variables.
-   * @return                  the map containing the escaped environment variables.
-   */
-  private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmdBuilder) {
-    Map<String, String> env = new HashMap<String, String>();
-    for (Map.Entry<String, String> entry : cmdBuilder.buildEnvironment().entrySet()) {
-      String escapedValue = Util.envVarEscape(entry.getValue());
-      env.put(entry.getKey(), escapedValue);
-    }
-    return env;
-  }
-
-
-  private String getFormattedCommand(String logDirExpansionVar,
-                                     String jobLib,
-                                     String command,
-                                     String stdOut,
-                                     String stdErr) {
-    if (!jobLib.isEmpty()) {
-      jobLib = "&& " + jobLib; // add job's libraries exported to an env variable
-    }
-
-    return String
-        .format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar,
-            jobLib, logDirExpansionVar, command, stdOut, stdErr);
-  }
-}