You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/07/12 18:28:21 UTC

[1/3] samza git commit: SAMZA-903: Refactor UI state variables

Repository: samza
Updated Branches:
  refs/heads/master 920f803a2 -> 9396ee5cc


http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
deleted file mode 100644
index d747b81..0000000
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
+++ /dev/null
@@ -1,502 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.job.yarn.util.MockContainerAllocator;
-import org.apache.samza.job.yarn.util.MockHttpServer;
-import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
-import org.apache.samza.job.yarn.util.TestUtil;
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestSamzaTaskManager {
-  private AMRMClientAsyncImpl amRmClientAsync;
-  private TestAMRMClientImpl testAMRMClient;
-
-  private static volatile boolean isRunning = false;
-
-  private Map<String, String> configVals = new HashMap<String, String>()  {
-    {
-      put("yarn.container.count", "1");
-      put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
-      put("yarn.container.memory.mb", "512");
-      put("yarn.package.path", "/foo");
-      put("task.inputs", "test-system.test-stream");
-      put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
-      put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
-      put("yarn.container.retry.count", "1");
-      put("yarn.container.retry.window.ms", "1999999999");
-      put("yarn.allocator.sleep.ms", "1");
-      put("yarn.container.request.timeout.ms", "2");
-    }
-  };
-  private Config config = new MapConfig(configVals);
-
-  private Config getConfig() {
-    Map<String, String> map = new HashMap<>();
-    map.putAll(config);
-    return new MapConfig(map);
-  }
-
-  private Config getConfigWithHostAffinity() {
-    Map<String, String> map = new HashMap<>();
-    map.putAll(config);
-    map.put("yarn.samza.host-affinity.enabled", "true");
-    return new MapConfig(map);
-  }
-
-  private SamzaAppState state = null;
-  private HttpServer server = null;
-
-  private JobModelManager getCoordinator(int containerCount) {
-    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
-    for (int i = 0; i < containerCount; i++) {
-      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
-      containers.put(i, container);
-    }
-    Map<Integer, Map<String, String>> localityMap = new HashMap<>();
-    localityMap.put(0, new HashMap<String, String>(){
-      {
-        put(SetContainerHostMapping.HOST_KEY, "abc");
-      }
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-
-    JobModel jobModel = new JobModel(getConfig(), containers, mockLocalityManager);
-    JobModelManager.jobModelRef().getAndSet(jobModel);
-    return new JobModelManager(jobModel, server, null);
-  }
-
-  @Before
-  public void setup() throws Exception {
-    // Create AMRMClient
-    testAMRMClient = new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-            false,
-            new ArrayList<Container>(),
-            new ArrayList<ContainerStatus>()
-        ));
-    amRmClientAsync = TestUtil.getAMClient(testAMRMClient);
-
-    server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
-
-    // Initialize coordinator url
-    state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2);
-    state.coordinatorUrl = new URL("http://localhost:1234");
-  }
-
-  @After
-  public void teardown() {
-    server.stop();
-  }
-
-  private Field getPrivateFieldFromTaskManager(String fieldName, SamzaTaskManager object) throws Exception {
-    Field field = object.getClass().getDeclaredField(fieldName);
-    field.setAccessible(true);
-    return field;
-  }
-
-  @Test
-  public void testSamzaTaskManager() throws Exception {
-    Map<String, String> conf = new HashMap<>();
-    conf.putAll(getConfig());
-    conf.put("yarn.container.memory.mb", "500");
-    conf.put("yarn.container.cpu.cores", "5");
-
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        new MapConfig(conf),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-        );
-
-    AbstractContainerAllocator allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
-    assertEquals(ContainerAllocator.class, allocator.getClass());
-    // Asserts that samza exposed container configs is honored by allocator thread
-    assertEquals(500, allocator.containerMaxMemoryMb);
-    assertEquals(5, allocator.containerMaxCpuCore);
-
-    conf.clear();
-    conf.putAll(getConfigWithHostAffinity());
-    conf.put("yarn.container.memory.mb", "500");
-    conf.put("yarn.container.cpu.cores", "5");
-
-    taskManager = new SamzaTaskManager(
-        new MapConfig(conf),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-
-    allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
-    assertEquals(HostAwareContainerAllocator.class, allocator.getClass());
-    // Asserts that samza exposed container configs is honored by allocator thread
-    assertEquals(500, allocator.containerMaxMemoryMb);
-    assertEquals(5, allocator.containerMaxCpuCore);
-  }
-
-  @Test
-  public void testContainerConfigsAreHonoredInAllocator() {
-
-  }
-
-  @Test
-  public void testOnInit() throws Exception {
-    Config conf = getConfig();
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        conf,
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-
-    MockContainerAllocator allocator = new MockContainerAllocator(
-        amRmClientAsync,
-        TestUtil.getContainerUtil(getConfig(), state),
-        new YarnConfig(conf));
-    getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
-
-    getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, new Thread() {
-      public void run() {
-        isRunning = true;
-      }
-    });
-
-    taskManager.onInit();
-    Thread.sleep(1000);
-
-    // Verify Allocator thread has started running
-    assertTrue(isRunning);
-
-    // Verify the remaining state
-    assertEquals(1, state.neededContainers.get());
-    assertEquals(1, allocator.requestedContainers);
-
-    taskManager.onShutdown();
-  }
-
-  @Test
-  public void testOnShutdown() throws Exception {
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        getConfig(),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-    taskManager.onInit();
-
-    Thread.sleep(100);
-
-    Thread allocatorThread = (Thread) getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager);
-    assertTrue(allocatorThread.isAlive());
-
-    taskManager.onShutdown();
-
-    Thread.sleep(100);
-    assertFalse(allocatorThread.isAlive());
-
-  }
-
-  /**
-   * Test Task Manager should stop when all containers finish
-   */
-  @Test
-  public void testTaskManagerShouldStopWhenContainersFinish() {
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        getConfig(),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-
-    taskManager.onInit();
-
-    assertFalse(taskManager.shouldShutdown());
-
-    taskManager.onContainerCompleted(TestUtil.getContainerStatus(state.amContainerId, ContainerExitStatus.SUCCESS, ""));
-
-    assertTrue(taskManager.shouldShutdown());
-  }
-
-  /**
-   * Test Task Manager should request a new container when a task fails with unknown exit code
-   * When host-affinity is not enabled, it will always request for ANY_HOST
-   */
-  @Test
-  public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception {
-    Config conf = getConfig();
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        conf,
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-    MockContainerAllocator allocator = new MockContainerAllocator(
-        amRmClientAsync,
-        TestUtil.getContainerUtil(getConfig(), state),
-        new YarnConfig(conf));
-    getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
-
-    Thread thread = new Thread(allocator);
-    getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
-
-    // onInit triggers a request
-    taskManager.onInit();
-
-    assertFalse(taskManager.shouldShutdown());
-    assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size());
-
-    Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
-    taskManager.onContainerAllocated(container);
-
-    // Allow container to run and update state
-    Thread.sleep(300);
-
-    // Create first container failure
-    taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 1, "Expecting a failure here"));
-
-    // The above failure should trigger a container request
-    assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size());
-    assertEquals(ContainerRequestState.ANY_HOST, allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost());
-    assertFalse(taskManager.shouldShutdown());
-    assertFalse(state.jobHealthy.get());
-    assertEquals(2, testAMRMClient.requests.size());
-    assertEquals(0, testAMRMClient.getRelease().size());
-
-    taskManager.onContainerAllocated(container);
-
-    // Allow container to run and update state
-    Thread.sleep(300);
-
-    assertTrue(state.jobHealthy.get());
-
-    // Create a second failure
-    taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 1, "Expecting a failure here"));
-
-    // The above failure should trigger a job shutdown because our retry count is set to 1
-    assertEquals(0, allocator.getContainerRequestState().getRequestsQueue().size());
-    assertEquals(2, testAMRMClient.requests.size());
-    assertEquals(0, testAMRMClient.getRelease().size());
-    assertFalse(state.jobHealthy.get());
-    assertTrue(taskManager.shouldShutdown());
-    assertEquals(FinalApplicationStatus.FAILED, state.status);
-
-    taskManager.onShutdown();
-  }
-
-  /**
-   * Test Task Manager should request a new container when a task fails with unknown exit code
-   * When host-affinity is enabled, it will always request for the same host that it was last seen on
-   */
-  @Test
-  public void testSameContainerRequestedOnFailureWithUnknownCode() throws Exception {
-    Config conf = getConfigWithHostAffinity();
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        conf,
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-    MockContainerAllocator allocator = new MockContainerAllocator(
-        amRmClientAsync,
-        TestUtil.getContainerUtil(getConfig(), state),
-        new YarnConfig(conf));
-    getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
-
-    Thread thread = new Thread(allocator);
-    getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
-
-    // onInit triggers a request
-    taskManager.onInit();
-
-    assertFalse(taskManager.shouldShutdown());
-    assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size());
-
-    Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
-    taskManager.onContainerAllocated(container);
-
-    // Allow container to run and update state
-    Thread.sleep(300);
-
-    // Create first container failure
-    taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 1, "Expecting a failure here"));
-
-    // The above failure should trigger a container request
-    assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size());
-    assertEquals("abc", allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost());
-    assertFalse(taskManager.shouldShutdown());
-    assertFalse(state.jobHealthy.get());
-    assertEquals(2, testAMRMClient.requests.size());
-    assertEquals(0, testAMRMClient.getRelease().size());
-
-    taskManager.onContainerAllocated(container);
-
-    // Allow container to run and update state
-    Thread.sleep(300);
-
-    assertTrue(state.jobHealthy.get());
-
-    // Create a second failure
-    taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), 1, "Expecting a failure here"));
-
-    // The above failure should trigger a job shutdown because our retry count is set to 1
-    assertEquals(0, allocator.getContainerRequestState().getRequestsQueue().size());
-    assertEquals(2, testAMRMClient.requests.size());
-    assertEquals(0, testAMRMClient.getRelease().size());
-    assertFalse(state.jobHealthy.get());
-    assertTrue(taskManager.shouldShutdown());
-    assertEquals(FinalApplicationStatus.FAILED, state.status);
-
-    taskManager.onShutdown();
-  }
-
-  /**
-   * Test AM requests a new container when a task fails
-   * Error codes with same behavior - Disk failure, preemption and aborted
-   */
-  @Test
-  public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception {
-    Map<String, String> config = new HashMap<>();
-    config.putAll(getConfig());
-    config.remove("yarn.container.retry.count");
-
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        new MapConfig(config),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-    MockContainerAllocator allocator = new MockContainerAllocator(
-        amRmClientAsync,
-        TestUtil.getContainerUtil(getConfig(), state),
-        new YarnConfig(new MapConfig(config)));
-    getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
-
-    Thread thread = new Thread(allocator);
-    getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
-
-    // Start the task manager
-    taskManager.onInit();
-    assertFalse(taskManager.shouldShutdown());
-    assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size());
-
-    Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
-    taskManager.onContainerAllocated(container);
-
-    // Allow container to run and update state
-    Thread.sleep(300);
-
-    // Create container failure - with ContainerExitStatus.DISKS_FAILED
-    taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), ContainerExitStatus.DISKS_FAILED, "Disk failure"));
-
-    // The above failure should trigger a container request
-    assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size());
-    assertFalse(taskManager.shouldShutdown());
-    assertFalse(state.jobHealthy.get());
-    assertEquals(2, testAMRMClient.requests.size());
-    assertEquals(0, testAMRMClient.getRelease().size());
-    assertEquals(ContainerRequestState.ANY_HOST, allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost());
-
-    // Create container failure - with ContainerExitStatus.PREEMPTED
-    taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), ContainerExitStatus.PREEMPTED, "Task Preempted by RM"));
-
-    // The above failure should trigger a container request
-    assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size());
-    assertFalse(taskManager.shouldShutdown());
-    assertFalse(state.jobHealthy.get());
-    assertEquals(ContainerRequestState.ANY_HOST, allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost());
-
-    // Create container failure - with ContainerExitStatus.ABORTED
-    taskManager.onContainerCompleted(TestUtil.getContainerStatus(container.getId(), ContainerExitStatus.ABORTED, "Task Aborted by the NM"));
-
-    // The above failure should trigger a container request
-    assertEquals(1, allocator.getContainerRequestState().getRequestsQueue().size());
-    assertEquals(2, testAMRMClient.requests.size());
-    assertEquals(0, testAMRMClient.getRelease().size());
-    assertFalse(taskManager.shouldShutdown());
-    assertFalse(state.jobHealthy.get());
-    assertEquals(ContainerRequestState.ANY_HOST, allocator.getContainerRequestState().getRequestsQueue().peek().getPreferredHost());
-
-    taskManager.onShutdown();
-  }
-
-  @Test
-  public void testAppMasterWithFwk () {
-    SamzaTaskManager taskManager = new SamzaTaskManager(
-        new MapConfig(config),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-    taskManager.onInit();
-
-    assertFalse(taskManager.shouldShutdown());
-    ContainerId container2 = ConverterUtils.toContainerId("container_1350670447861_0003_01_000002");
-    taskManager.onContainerAllocated(TestUtil.getContainer(container2, "", 12345));
-
-
-    configVals.put(JobConfig.SAMZA_FWK_PATH(), "/export/content/whatever");
-    Config config1 = new MapConfig(configVals);
-
-    SamzaTaskManager taskManager1 = new SamzaTaskManager(
-        new MapConfig(config1),
-        state,
-        amRmClientAsync,
-        new YarnConfiguration()
-    );
-
-    taskManager1.onInit();
-    taskManager1.onContainerAllocated(TestUtil.getContainer(container2, "", 12345));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
deleted file mode 100644
index 3290247..0000000
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn.util;
-
-import java.lang.reflect.Field;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.job.yarn.AbstractContainerAllocator;
-import org.apache.samza.job.yarn.ContainerAllocator;
-import org.apache.samza.job.yarn.ContainerRequestState;
-import org.apache.samza.job.yarn.ContainerUtil;
-
-import java.util.Map;
-
-public class MockContainerAllocator extends ContainerAllocator {
-  public int requestedContainers = 0;
-
-  public MockContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync,
-                                ContainerUtil containerUtil,
-                                YarnConfig yarnConfig) {
-    super(amrmClientAsync, containerUtil, yarnConfig);
-  }
-
-  @Override
-  public void requestContainers(Map<Integer, String> containerToHostMappings) {
-    requestedContainers += containerToHostMappings.size();
-    super.requestContainers(containerToHostMappings);
-  }
-
-  public ContainerRequestState getContainerRequestState() throws Exception {
-    Field field = AbstractContainerAllocator.class.getDeclaredField("containerRequestState");
-    field.setAccessible(true);
-
-    return (ContainerRequestState) field.get(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
deleted file mode 100644
index 7c0b504..0000000
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn.util;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.job.yarn.ContainerRequestState;
-import org.apache.samza.job.yarn.SamzaContainerRequest;
-
-
-public class MockContainerRequestState extends ContainerRequestState {
-  private final List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>();
-  private int numAddedContainers = 0;
-  private int numReleasedContainers = 0;
-  private int numAssignedContainers = 0;
-  public Queue<SamzaContainerRequest> assignedRequests = new LinkedList<>();
-
-  public MockContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
-      boolean hostAffinityEnabled) {
-    super(amClient, hostAffinityEnabled);
-  }
-
-  @Override
-  public synchronized void updateStateAfterAssignment(SamzaContainerRequest request, String assignedHost, Container container) {
-    super.updateStateAfterAssignment(request, assignedHost, container);
-
-    numAssignedContainers++;
-    assignedRequests.add(request);
-
-    for (MockContainerListener listener : mockContainerListeners) {
-      listener.postUpdateRequestStateAfterAssignment(numAssignedContainers);
-    }
-  }
-
-  @Override
-  public synchronized void addContainer(Container container) {
-    super.addContainer(container);
-
-    numAddedContainers++;
-    for (MockContainerListener listener : mockContainerListeners) {
-      listener.postAddContainer(numAddedContainers);
-    }
-  }
-
-  @Override
-  public synchronized int releaseExtraContainers() {
-    numReleasedContainers += super.releaseExtraContainers();
-
-    for (MockContainerListener listener : mockContainerListeners) {
-      listener.postReleaseContainers(numReleasedContainers);
-    }
-
-    return numAddedContainers;
-  }
-
-  @Override
-  public void releaseUnstartableContainer(Container container) {
-    super.releaseUnstartableContainer(container);
-
-    numReleasedContainers += 1;
-    for (MockContainerListener listener : mockContainerListeners) {
-      listener.postReleaseContainers(numReleasedContainers);
-    }
-  }
-
-  public void registerContainerListener(MockContainerListener listener) {
-    mockContainerListeners.add(listener);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
deleted file mode 100644
index cf3e143..0000000
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.samza.config.Config;
-import org.apache.samza.job.yarn.ContainerUtil;
-import org.apache.samza.job.yarn.SamzaAppState;
-import org.apache.samza.job.yarn.SamzaContainerLaunchException;
-
-
-public class MockContainerUtil extends ContainerUtil {
-  private final List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>();
-  public final Map<String, List<Container>> runningContainerList = new HashMap<>();
-  public Exception containerStartException = null;
-
-  public MockContainerUtil(Config config, SamzaAppState state, YarnConfiguration conf, NMClient nmClient) {
-    super(config, state, conf);
-    this.setNmClient(nmClient);
-  }
-
-  @Override
-  public void runContainer(int samzaContainerId, Container container) throws SamzaContainerLaunchException {
-    String hostname = container.getNodeHttpAddress().split(":")[0];
-    List<Container> list = runningContainerList.get(hostname);
-    if (list == null) {
-      list = new ArrayList<Container>();
-      list.add(container);
-      runningContainerList.put(hostname, list);
-    } else {
-      list.add(container);
-      runningContainerList.put(hostname, list);
-    }
-    super.runContainer(samzaContainerId, container);
-
-    for (MockContainerListener listener : mockContainerListeners) {
-      listener.postRunContainer(runningContainerList.size());
-    }
-  }
-
-  @Override
-  public void startContainer(Path packagePath, Container container, Map<String, String> env, String cmd) throws
-                                                                                                         SamzaContainerLaunchException {
-    if (containerStartException != null) {
-      throw new SamzaContainerLaunchException(containerStartException);
-    }
-  }
-
-  public void registerContainerListener(MockContainerListener listener) {
-    mockContainerListeners.add(listener);
-  }
-
-  public void clearContainerListeners() {
-    mockContainerListeners.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java
deleted file mode 100644
index d4c9c96..0000000
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn.util;
-
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
-
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
-import org.apache.samza.config.Config;
-import org.apache.samza.job.yarn.ContainerUtil;
-import org.apache.samza.job.yarn.SamzaAppMaster$;
-import org.apache.samza.job.yarn.SamzaAppState;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class TestUtil {
-
-  public static AMRMClientAsyncImpl<ContainerRequest> getAMClient(final TestAMRMClientImpl amClient) {
-    return new AMRMClientAsyncImpl<ContainerRequest>(amClient, 1, SamzaAppMaster$.MODULE$) {
-          public TestAMRMClientImpl getClient() {
-            return amClient;
-          }
-    };
-  }
-
-  public static AllocateResponse getAppMasterResponse(final boolean reboot,
-                                               final List<Container> containers,
-                                               final List<ContainerStatus> completed) {
-    return new AllocateResponse() {
-      @Override
-      public AMCommand getAMCommand() {
-      // Not sure how to throw exception without changing method signature!
-        if (reboot) {
-          try {
-            throw new ApplicationAttemptNotFoundException("Test - out of sync");
-          } catch (ApplicationAttemptNotFoundException e) {
-            return AMCommand.AM_RESYNC;
-          }
-        } else {
-          return null;
-        }
-      }
-
-      @Override
-      public void setAMCommand(AMCommand command) {}
-
-      @Override
-      public int getResponseId() {
-        return 0;
-      }
-
-      @Override
-      public void setResponseId(int responseId) {}
-
-      @Override
-      public List<Container> getAllocatedContainers() {
-        return containers;
-      }
-
-      @Override
-      public void setAllocatedContainers(List<Container> containers) {}
-
-      @Override
-      public Resource getAvailableResources() {
-        return null;
-      }
-
-      @Override
-      public void setAvailableResources(Resource limit) {}
-
-      @Override
-      public List<ContainerStatus> getCompletedContainersStatuses() {
-        return completed;
-      }
-
-      @Override
-      public void setCompletedContainersStatuses(List<ContainerStatus> containers) {}
-
-      @Override
-      public List<NodeReport> getUpdatedNodes() {
-        return new ArrayList<NodeReport>();
-      }
-
-      @Override
-      public void setUpdatedNodes(List<NodeReport> updatedNodes) {}
-
-      @Override
-      public int getNumClusterNodes() {
-        return 1;
-      }
-
-      @Override
-      public void setNumClusterNodes(int numNodes) {
-
-      }
-
-      @Override
-      public PreemptionMessage getPreemptionMessage() {
-        return null;
-      }
-
-      @Override
-      public void setPreemptionMessage(PreemptionMessage request) {}
-
-      @Override
-      public List<NMToken> getNMTokens() {
-        return new ArrayList<NMToken>();
-      }
-
-      @Override
-      public void setNMTokens(List<NMToken> nmTokens) {}
-
-      @Override
-      public List<ContainerResourceIncrease> getIncreasedContainers() {
-        return Collections.<ContainerResourceIncrease>emptyList();
-      }
-
-      @Override
-      public void setIncreasedContainers(List<ContainerResourceIncrease> increasedContainers) {}
-
-      @Override
-      public List<ContainerResourceDecrease> getDecreasedContainers() {
-        return Collections.<ContainerResourceDecrease>emptyList();
-      }
-
-      @Override
-      public void setDecreasedContainers(List<ContainerResourceDecrease> decreasedContainers) {
-
-      }
-
-      @Override
-      public Token getAMRMToken() {
-        return null;
-      }
-
-      @Override
-      public void setAMRMToken(Token amRMToken) {}
-    };
-  }
-
-  public static Container getContainer(final ContainerId containerId, final String host, final int port) {
-    return new Container() {
-      @Override
-      public ContainerId getId() {
-        return containerId;
-      }
-
-      @Override
-      public void setId(ContainerId id) { }
-
-      @Override
-      public NodeId getNodeId() {
-        return NodeId.newInstance(host, port);
-      }
-
-      @Override
-      public void setNodeId(NodeId nodeId) {  }
-
-      @Override
-      public String getNodeHttpAddress() {
-        return host + ":" + port;
-      }
-
-      @Override
-      public void setNodeHttpAddress(String nodeHttpAddress) {  }
-
-      @Override
-      public Resource getResource() {
-        return null;
-      }
-
-      @Override
-      public void setResource(Resource resource) {  }
-
-      @Override
-      public Priority getPriority() {
-        return null;
-      }
-
-      @Override
-      public void setPriority(Priority priority) {  }
-
-      @Override
-      public Token getContainerToken() {
-        return null;
-      }
-
-      @Override
-      public void setContainerToken(Token containerToken) { }
-
-      @Override
-      public int compareTo(Container o) {
-        return containerId.compareTo(o.getId());
-      }
-    };
-  }
-
-  /**
-   * Returns MockContainerUtil instance with a Mock NMClient
-   * */
-  public static ContainerUtil getContainerUtil(Config config, SamzaAppState state) {
-    return new MockContainerUtil(config, state, new YarnConfiguration(), new MockNMClient("Mock NMClient"));
-  }
-
-  public static ContainerStatus getContainerStatus(final ContainerId containerId,
-                                                   final int exitCode,
-                                                   final String diagnostic) {
-    return new ContainerStatus() {
-      @Override
-      public ContainerId getContainerId() {
-        return containerId;
-      }
-
-      @Override
-      public void setContainerId(ContainerId containerId) { }
-
-      @Override
-      public ContainerState getState() {
-        return null;
-      }
-
-      @Override
-      public void setState(ContainerState state) {  }
-
-      @Override
-      public int getExitStatus() {
-        return exitCode;
-      }
-
-      @Override
-      public void setExitStatus(int exitStatus) { }
-
-      @Override
-      public String getDiagnostics() {
-        return diagnostic;
-      }
-
-      @Override
-      public void setDiagnostics(String diagnostics) {  }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
new file mode 100644
index 0000000..d3d34f2
--- /dev/null
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import org.apache.samza.Partition
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, SystemAdmin, SystemFactory}
+import scala.collection.JavaConversions._
+
+/**
+ * A mock implementation class that returns metadata for each stream that contains numTasks partitions in it.
+ */
+class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
+  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null
+  def getSystemStreamMetadata(streamNames: java.util.Set[String]) = {
+    streamNames.map(streamName => {
+      var partitionMetadata = (0 until numTasks).map(partitionId => {
+        new Partition(partitionId) -> new SystemStreamPartitionMetadata(null, null, null)
+      }).toMap
+      streamName -> new SystemStreamMetadata(streamName, partitionMetadata)
+    }).toMap[String, SystemStreamMetadata]
+  }
+
+  override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
+    new UnsupportedOperationException("Method not implemented.")
+  }
+
+  override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
+    new UnsupportedOperationException("Method not implemented.")
+  }
+
+  override def createCoordinatorStream(streamName: String) {
+    new UnsupportedOperationException("Method not implemented.")
+  }
+
+  override def offsetComparator(offset1: String, offset2: String) = null
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemFactory.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemFactory.scala
new file mode 100644
index 0000000..458400e
--- /dev/null
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemFactory.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import org.apache.samza.config.{JobConfig, Config}
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.SystemFactory
+
+/**
+  * A {@link org.apache.samza.system.SystemFactory} implementation that returns a {@link org.apache.samza.job.yarn.MockSystemAdmin}.
+  */
+
+class MockSystemFactory extends SystemFactory {
+  def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
+    throw new RuntimeException("Hmm. Not implemented.")
+  }
+
+  def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
+    throw new RuntimeException("Hmm. Not implemented.")
+  }
+
+  def getAdmin(systemName: String, config: Config) = {
+    val jobConfig = new JobConfig(config)
+    new MockSystemAdmin(jobConfig.getContainerCount)
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
deleted file mode 100644
index 3f056c4..0000000
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
-import org.apache.samza.job.yarn.util.{TestUtil, TestAMRMClientImpl}
-import org.junit.Test
-import org.junit.Assert._
-
-
-class TestSamzaAppMaster {
-  @Test
-  def testAppMasterShouldShutdown {
-    val amClient = TestUtil.getAMClient(
-      new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-          false,
-          new java.util.ArrayList[Container](),
-          new java.util.ArrayList[ContainerStatus]())
-      ))
-    val listener = new YarnAppMasterListener {
-      var init = 0
-      var shutdown = 0
-      var allocated = 0
-      var complete = 0
-      override def shouldShutdown = true
-      override def onInit() {
-        init += 1
-      }
-      override def onShutdown() {
-        shutdown += 1
-      }
-      override def onContainerAllocated(container: Container) {
-        allocated += 1
-      }
-      override def onContainerCompleted(containerStatus: ContainerStatus) {
-        complete += 1
-      }
-    }
-    SamzaAppMaster.listeners = List(listener)
-    SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
-    assertEquals(1, listener.init)
-    assertEquals(1, listener.shutdown)
-  }
-
-  @Test
-  def testAppMasterShouldShutdownWithFailingListener {
-    val amClient = TestUtil.getAMClient(
-      new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-          false,
-          new java.util.ArrayList[Container](),
-          new java.util.ArrayList[ContainerStatus]())))
-    val listener1 = new YarnAppMasterListener {
-      var shutdown = 0
-      override def shouldShutdown = true
-      override def onShutdown() {
-        shutdown += 1
-        throw new RuntimeException("Some weird failure")
-      }
-    }
-    val listener2 = new YarnAppMasterListener {
-      var shutdown = 0
-      override def shouldShutdown = true
-      override def onShutdown() {
-        shutdown += 1
-      }
-    }
-    // listener1 will throw an exception in shutdown, and listener2 should still get called
-    SamzaAppMaster.listeners = List(listener1, listener2)
-    SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
-    assertEquals(1, listener1.shutdown)
-    assertEquals(1, listener2.shutdown)
-  }
-
-  @Test
-  def testAppMasterShouldShutdownWithInterrupt {
-    val amClient = TestUtil.getAMClient(
-      new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-          false,
-          new java.util.ArrayList[Container](),
-          new java.util.ArrayList[ContainerStatus]())
-      )
-    )
-    val listener = new YarnAppMasterListener {
-      var init = 0
-      var shutdown = 0
-      override def shouldShutdown = false
-      override def onInit() {
-        init += 1
-      }
-      override def onShutdown() {
-        shutdown += 1
-      }
-    }
-    val thread = new Thread {
-      override def run {
-        SamzaAppMaster.listeners = List(listener)
-        SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
-      }
-    }
-    thread.start
-    thread.interrupt
-    thread.join
-    assertEquals(1, listener.init)
-    assertEquals(1, listener.shutdown)
-  }
-
-  @Test
-  def testAppMasterShouldForwardAllocatedAndCompleteContainers {
-    val amClient = TestUtil.getAMClient(
-      new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-          false,
-          new java.util.ArrayList[Container]{ add(TestUtil.getContainer(null, "", 12345));  },
-          new java.util.ArrayList[ContainerStatus]{ add(TestUtil.getContainerStatus(null, 1, null));  })
-      )
-    )
-    val listener = new YarnAppMasterListener {
-      var allocated = 0
-      var complete = 0
-      override def onInit(): Unit = amClient.registerApplicationMaster("", -1, "")
-      override def shouldShutdown = (allocated >= 1 && complete >= 1)
-      override def onContainerAllocated(container: Container) {
-        allocated += 1
-      }
-      override def onContainerCompleted(containerStatus: ContainerStatus) {
-        complete += 1
-      }
-    }
-    SamzaAppMaster.listeners = List(listener)
-    SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
-    // heartbeat may be triggered for more than once
-    assertTrue(listener.allocated >= 1)
-    assertTrue(listener.complete >= 1)
-  }
-
-  @Test
-  def testAppMasterShouldReboot {
-    val response: AllocateResponse = getAppMasterResponse(
-      true,
-      new java.util.ArrayList[Container](),
-      new java.util.ArrayList[ContainerStatus]())
-
-    val amClient = TestUtil.getAMClient(
-      new TestAMRMClientImpl(response))
-
-    val listener = new YarnAppMasterListener {
-      var reboot = 0
-      override def onInit(): Unit = amClient.registerApplicationMaster("", -1, "")
-      override def shouldShutdown = reboot >= 1
-      override def onReboot() {
-        reboot += 1
-      }
-    }
-    SamzaAppMaster.listeners = List(listener)
-    SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
-    // heartbeat may be triggered for more than once
-    assertTrue(listener.reboot >= 1)
-  }
-
-  /**
-   * This method is necessary because in Yarn 2.6, an RM reboot results in the allocate() method throwing an exception,
-   * rather than invoking AM_RESYNC command. However, we cannot mock out the AllocateResponse class in java because it
-   * will require the getAMCommand() signature to change and allow throwing an exception. This is however allowed in Scala.
-   * Since this is beyond our scope and we don't have a better way to mock the scenario for an RM reboot in our unit
-   * tests, we are keeping the following scala method for now.
-   */
-  def getAppMasterResponse(reboot: Boolean, containers: java.util.List[Container], completed: java.util.List[ContainerStatus]) =
-    new AllocateResponse {
-      override def getResponseId() = 0
-      override def setResponseId(responseId: Int) {}
-      override def getAllocatedContainers() = containers
-      override def setAllocatedContainers(containers: java.util.List[Container]) {}
-      override def getAvailableResources(): Resource = null
-      override def setAvailableResources(limit: Resource) {}
-      override def getCompletedContainersStatuses() = completed
-      override def setCompletedContainersStatuses(containers: java.util.List[ContainerStatus]) {}
-      override def setUpdatedNodes(nodes: java.util.List[NodeReport]) {}
-      override def getUpdatedNodes = new java.util.ArrayList[NodeReport]()
-      override def getNumClusterNodes = 1
-      override def setNumClusterNodes(num: Int) {}
-      override def getNMTokens = new java.util.ArrayList[NMToken]()
-      override def setNMTokens(nmTokens: java.util.List[NMToken]) {}
-      override def setAMCommand(command: AMCommand) {}
-      override def getPreemptionMessage = null
-      override def setPreemptionMessage(request: PreemptionMessage) {}
-      override def getDecreasedContainers(): java.util.List[ContainerResourceDecrease] = java.util.Collections.emptyList[ContainerResourceDecrease]
-      override def getIncreasedContainers(): java.util.List[ContainerResourceIncrease] = java.util.Collections.emptyList[ContainerResourceIncrease]
-      override def setDecreasedContainers(decrease: java.util.List[ContainerResourceDecrease]): Unit = Unit
-      override def setIncreasedContainers(increase: java.util.List[ContainerResourceIncrease]): Unit = Unit
-
-      override def getAMCommand = if (reboot) {
-        throw new ApplicationAttemptNotFoundException("Test - out of sync")
-      } else {
-        null
-      }
-      override def getAMRMToken: Token = null
-      override def setAMRMToken(amRMToken: Token): Unit = {}
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
deleted file mode 100644
index 750f467..0000000
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import java.nio.ByteBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.samza.SamzaException
-import org.junit.Assert._
-import org.junit.Test
-import org.mockito.Mockito
-import java.net.URL
-import org.apache.samza.coordinator.JobModelManager
-
-class TestSamzaAppMasterLifecycle {
-  val coordinator = new JobModelManager(null, null)
-  val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, Mockito.mock(classOf[CallbackHandler])) {
-    var host = ""
-    var port = 0
-    var status: FinalApplicationStatus = null
-    override def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = {
-      this.host = appHostName
-      this.port = appHostPort
-      new RegisterApplicationMasterResponse {
-        override def setApplicationACLs(map: java.util.Map[ApplicationAccessType, String]): Unit = ()
-        override def getApplicationACLs = null
-        override def setMaximumResourceCapability(r: Resource): Unit = ()
-        override def getMaximumResourceCapability = new Resource {
-          def getMemory = 512
-          def getVirtualCores = 2
-          def setMemory(memory: Int) {}
-          def setVirtualCores(vCores: Int) {}
-          def compareTo(o: Resource) = 0
-        }
-        override def getClientToAMTokenMasterKey = null
-        override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {}
-        override def getContainersFromPreviousAttempts(): java.util.List[Container] = java.util.Collections.emptyList[Container]
-        override def getNMTokensFromPreviousAttempts(): java.util.List[NMToken] = java.util.Collections.emptyList[NMToken]
-        override def getQueue(): String = null
-        override def setContainersFromPreviousAttempts(containers: java.util.List[Container]): Unit = Unit
-        override def setNMTokensFromPreviousAttempts(nmTokens: java.util.List[NMToken]): Unit = Unit
-        override def setQueue(queue: String): Unit = Unit
-
-        override def setSchedulerResourceTypes(types: java.util.EnumSet[SchedulerResourceTypes]): Unit = {}
-        override def getSchedulerResourceTypes: java.util.EnumSet[SchedulerResourceTypes] = null
-      }
-    }
-    override def unregisterApplicationMaster(appStatus: FinalApplicationStatus,
-      appMessage: String,
-      appTrackingUrl: String) {
-      this.status = appStatus
-    }
-    override def releaseAssignedContainer(containerId: ContainerId) {}
-    override def getClusterNodeCount() = 1
-
-    override def serviceInit(config: Configuration) {}
-    override def serviceStart() {}
-    override def serviceStop() {}
-  }
-
-  @Test
-  def testLifecycleShouldRegisterOnInit {
-    val state = new SamzaAppState(coordinator, -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
-    state.rpcUrl = new URL("http://localhost:1")
-    state.trackingUrl = new URL("http://localhost:2")
-    val saml = new SamzaAppMasterLifecycle(512, 2, state, amClient)
-    saml.onInit
-    assertEquals("test", amClient.host)
-    assertEquals(1, amClient.port)
-    assertFalse(saml.shouldShutdown)
-  }
-
-  @Test
-  def testLifecycleShouldUnregisterOnShutdown {
-    val state = new SamzaAppState(coordinator, -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
-    state.status = FinalApplicationStatus.SUCCEEDED
-    new SamzaAppMasterLifecycle(128, 1, state, amClient).onShutdown
-    assertEquals(FinalApplicationStatus.SUCCEEDED, amClient.status)
-  }
-
-  @Test
-  def testLifecycleShouldThrowAnExceptionOnReboot {
-    var gotException = false
-    try {
-      new SamzaAppMasterLifecycle(368, 1, null, amClient).onReboot
-    } catch {
-      // expected
-      case e: SamzaException => gotException = true
-    }
-    assertTrue(gotException)
-  }
-
-  @Test
-  def testLifecycleShouldShutdownOnInvalidContainerSettings {
-    val state = new SamzaAppState(coordinator, -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
-    state.rpcUrl = new URL("http://localhost:1")
-    state.trackingUrl = new URL("http://localhost:2")
-    List(new SamzaAppMasterLifecycle(768, 1, state, amClient),
-      new SamzaAppMasterLifecycle(368, 3, state, amClient)).map(saml => {
-        saml.onInit
-        assertTrue(saml.shouldShutdown)
-      })
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
deleted file mode 100644
index 3de5614..0000000
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import java.io.BufferedReader
-import java.net.URL
-import java.io.InputStreamReader
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.samza.Partition
-import org.apache.samza.config.MapConfig
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, SystemAdmin, SystemFactory}
-import org.junit.Assert._
-import org.junit.Test
-import scala.collection.JavaConversions._
-import org.apache.samza.config.Config
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.container.TaskName
-import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
-
-class TestSamzaAppMasterService {
-
-  @Test
-  def testAppMasterDashboardShouldStart {
-    val config = getDummyConfig
-    val state = new SamzaAppState(JobModelManager(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2)
-    val service = new SamzaAppMasterService(config, state, null, null, null)
-    val taskName = new TaskName("test")
-
-    // start the dashboard
-    service.onInit
-    assertTrue(state.rpcUrl.getPort > 0)
-    assertTrue(state.trackingUrl.getPort > 0)
-    assertTrue(state.coordinatorUrl.getPort > 0)
-
-    // check to see if it's running
-    val url = new URL(state.rpcUrl.toString + "am")
-    val is = url.openConnection().getInputStream();
-    val reader = new BufferedReader(new InputStreamReader(is));
-    var line: String = null;
-
-    do {
-      line = reader.readLine()
-    } while (line != null)
-
-    reader.close();
-  }
-
-  /**
-   * This tests the rendering of the index.scaml file containing some Scala code. The objective
-   * is to ensure that the rendered scala code builds correctly
-   */
-  @Test
-  def testAppMasterDashboardWebServiceShouldStart {
-    // Create some dummy config
-    val config = getDummyConfig
-    val state = new SamzaAppState(JobModelManager(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2)
-    val service = new SamzaAppMasterService(config, state, null, null, null)
-
-    // start the dashboard
-    service.onInit
-    assertTrue(state.rpcUrl.getPort > 0)
-    assertTrue(state.trackingUrl.getPort > 0)
-
-    // Do a GET Request on the tracking port: This in turn will render index.scaml
-    val url = state.trackingUrl
-    val is = url.openConnection().getInputStream()
-    val reader = new BufferedReader(new InputStreamReader(is))
-    var line: String = null
-
-    do {
-      line = reader.readLine()
-    } while (line != null)
-
-    reader.close
-  }
-
-  private def getDummyConfig: Config = new MapConfig(Map[String, String](
-    "job.name" -> "test",
-    "yarn.container.count" -> "1",
-    "systems.test-system.samza.factory" -> "org.apache.samza.job.yarn.MockSystemFactory",
-    "yarn.container.memory.mb" -> "512",
-    "yarn.package.path" -> "/foo",
-    "task.inputs" -> "test-system.test-stream",
-    "systems.test-system.samza.key.serde" -> "org.apache.samza.serializers.JsonSerde",
-    "systems.test-system.samza.msg.serde" -> "org.apache.samza.serializers.JsonSerde",
-    "yarn.container.retry.count" -> "1",
-    "yarn.container.retry.window.ms" -> "1999999999",
-    "job.coordinator.system" -> "coordinator",
-    "systems.coordinator.samza.factory" -> classOf[MockCoordinatorStreamSystemFactory].getCanonicalName))
-}
-
-class MockSystemFactory extends SystemFactory {
-  def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
-    throw new RuntimeException("Hmm. Not implemented.")
-  }
-
-  def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
-    throw new RuntimeException("Hmm. Not implemented.")
-  }
-
-  def getAdmin(systemName: String, config: Config) = {
-    new MockSystemAdmin(config.getContainerCount)
-  }
-}
-
-/**
- * Helper class that returns metadata for each stream that contains numTasks partitions in it.
- */
-class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
-  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null
-  def getSystemStreamMetadata(streamNames: java.util.Set[String]) = {
-    streamNames.map(streamName => {
-      var partitionMetadata = (0 until numTasks).map(partitionId => {
-        new Partition(partitionId) -> new SystemStreamPartitionMetadata(null, null, null)
-      }).toMap
-      streamName -> new SystemStreamMetadata(streamName, partitionMetadata)
-    }).toMap[String, SystemStreamMetadata]
-  }
-
-  override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
-    new UnsupportedOperationException("Method not implemented.")
-  }
-
-  override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
-    new UnsupportedOperationException("Method not implemented.")
-  }
-
-  override def createCoordinatorStream(streamName: String) {
-    new UnsupportedOperationException("Method not implemented.")
-  }
-
-  override def offsetComparator(offset1: String, offset2: String) = null
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala
new file mode 100644
index 0000000..2664e41
--- /dev/null
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import java.net.URL
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.samza.SamzaException
+import org.apache.samza.clustermanager.SamzaApplicationState
+import org.apache.samza.clustermanager.SamzaApplicationState.SamzaAppStatus
+import org.apache.samza.coordinator.JobModelManager
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito
+
+class TestSamzaYarnAppMasterLifecycle {
+  val coordinator = new JobModelManager(null, null)
+  val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, Mockito.mock(classOf[CallbackHandler])) {
+    var host = ""
+    var port = 0
+    var status: FinalApplicationStatus = null
+    override def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = {
+      this.host = appHostName
+      this.port = appHostPort
+      new RegisterApplicationMasterResponse {
+        override def setApplicationACLs(map: java.util.Map[ApplicationAccessType, String]): Unit = ()
+        override def getApplicationACLs = null
+        override def setMaximumResourceCapability(r: Resource): Unit = ()
+        override def getMaximumResourceCapability = new Resource {
+          def getMemory = 512
+          def getVirtualCores = 2
+          def setMemory(memory: Int) {}
+          def setVirtualCores(vCores: Int) {}
+          def compareTo(o: Resource) = 0
+        }
+        override def getClientToAMTokenMasterKey = null
+        override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {}
+        override def getContainersFromPreviousAttempts(): java.util.List[Container] = java.util.Collections.emptyList[Container]
+        override def getNMTokensFromPreviousAttempts(): java.util.List[NMToken] = java.util.Collections.emptyList[NMToken]
+        override def getQueue(): String = null
+        override def setContainersFromPreviousAttempts(containers: java.util.List[Container]): Unit = Unit
+        override def setNMTokensFromPreviousAttempts(nmTokens: java.util.List[NMToken]): Unit = Unit
+        override def setQueue(queue: String): Unit = Unit
+
+        override def setSchedulerResourceTypes(types: java.util.EnumSet[SchedulerResourceTypes]): Unit = {}
+        override def getSchedulerResourceTypes: java.util.EnumSet[SchedulerResourceTypes] = null
+      }
+    }
+    override def unregisterApplicationMaster(appStatus: FinalApplicationStatus,
+      appMessage: String,
+      appTrackingUrl: String) {
+      this.status = appStatus
+    }
+    override def releaseAssignedContainer(containerId: ContainerId) {}
+    override def getClusterNodeCount() = 1
+
+    override def serviceInit(config: Configuration) {}
+    override def serviceStart() {}
+    override def serviceStop() {}
+  }
+
+  @Test
+  def testLifecycleShouldRegisterOnInit {
+    val state = new SamzaApplicationState(coordinator)
+
+    val yarnState = new YarnAppState(1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 2);
+    yarnState.rpcUrl = new URL("http://localhost:1")
+    yarnState.trackingUrl = new URL("http://localhost:2")
+
+    val saml = new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, amClient)
+    saml.onInit
+    assertEquals("testHost", amClient.host)
+    assertEquals(1, amClient.port)
+    assertFalse(saml.shouldShutdown)
+  }
+
+  @Test
+  def testLifecycleShouldUnregisterOnShutdown {
+    val state = new SamzaApplicationState(coordinator)
+
+    val yarnState =  new YarnAppState(1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 2);
+    new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, amClient).onShutdown (SamzaAppStatus.SUCCEEDED)
+    assertEquals(FinalApplicationStatus.SUCCEEDED, amClient.status)
+  }
+
+  @Test
+  def testLifecycleShouldThrowAnExceptionOnReboot {
+    var gotException = false
+    try {
+      val state = new SamzaApplicationState(coordinator)
+
+      val yarnState =  new YarnAppState(1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 2);
+      new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, amClient).onReboot()
+    } catch {
+      // expected
+      case e: SamzaException => gotException = true
+    }
+    assertTrue(gotException)
+  }
+
+  @Test
+  def testLifecycleShouldShutdownOnInvalidContainerSettings {
+    val state = new SamzaApplicationState(coordinator)
+
+    val yarnState =  new YarnAppState(1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 2);
+    yarnState.rpcUrl = new URL("http://localhost:1")
+    yarnState.trackingUrl = new URL("http://localhost:2")
+
+    //Request a higher amount of memory from yarn.
+    List(new SamzaYarnAppMasterLifecycle(768, 1, state, yarnState, amClient),
+    //Request a higher number of cores from yarn.
+      new SamzaYarnAppMasterLifecycle(368, 3, state, yarnState, amClient)).map(saml => {
+        saml.onInit
+        assertTrue(saml.shouldShutdown)
+      })
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
new file mode 100644
index 0000000..1dd0c18
--- /dev/null
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import java.io.BufferedReader
+import java.net.URL
+import java.io.InputStreamReader
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.samza.Partition
+import org.apache.samza.clustermanager.SamzaApplicationState
+import org.apache.samza.config.MapConfig
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, SystemAdmin, SystemFactory}
+import org.junit.Assert._
+import org.junit.Test
+import scala.collection.JavaConversions._
+import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.container.TaskName
+import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+
+class TestSamzaYarnAppMasterService {
+
+  @Test
+  def testAppMasterDashboardShouldStart {
+    val config = getDummyConfig
+    val jobModelManager = JobModelManager(config)
+    val samzaState = new SamzaApplicationState(jobModelManager)
+
+    val state = new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "testHost", 1, 1);
+    val service = new SamzaYarnAppMasterService(config, samzaState, state, null, null)
+    val taskName = new TaskName("test")
+
+    // start the dashboard
+    service.onInit
+    assertTrue(state.rpcUrl.getPort > 0)
+    assertTrue(state.trackingUrl.getPort > 0)
+    assertTrue(state.coordinatorUrl.getPort > 0)
+
+    // check to see if it's running
+    val url = new URL(state.rpcUrl.toString + "am")
+    val is = url.openConnection().getInputStream();
+    val reader = new BufferedReader(new InputStreamReader(is));
+    var line: String = null;
+
+    do {
+      line = reader.readLine()
+    } while (line != null)
+
+    reader.close();
+  }
+
+  /**
+   * This tests the rendering of the index.scaml file containing some Scala code. The objective
+   * is to ensure that the rendered scala code builds correctly
+   */
+  @Test
+  def testAppMasterDashboardWebServiceShouldStart {
+    // Create some dummy config
+    val config = getDummyConfig
+    val jobModelManager = JobModelManager(config)
+    val samzaState = new SamzaApplicationState(jobModelManager)
+    val state = new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "testHost", 1, 1);
+
+    val service = new SamzaYarnAppMasterService(config, samzaState, state, null, null)
+
+    // start the dashboard
+    service.onInit
+    assertTrue(state.rpcUrl.getPort > 0)
+    assertTrue(state.trackingUrl.getPort > 0)
+
+    // Do a GET Request on the tracking port: This in turn will render index.scaml
+    val url = state.trackingUrl
+    val is = url.openConnection().getInputStream()
+    val reader = new BufferedReader(new InputStreamReader(is))
+    var line: String = null
+
+    do {
+      line = reader.readLine()
+    } while (line != null)
+
+    reader.close
+  }
+
+  private def getDummyConfig: Config = new MapConfig(Map[String, String](
+    "job.name" -> "test",
+    "yarn.container.count" -> "1",
+    "systems.test-system.samza.factory" -> "org.apache.samza.job.yarn.MockSystemFactory",
+    "yarn.container.memory.mb" -> "512",
+    "yarn.package.path" -> "/foo",
+    "task.inputs" -> "test-system.test-stream",
+    "systems.test-system.samza.key.serde" -> "org.apache.samza.serializers.JsonSerde",
+    "systems.test-system.samza.msg.serde" -> "org.apache.samza.serializers.JsonSerde",
+    "yarn.container.retry.count" -> "1",
+    "yarn.container.retry.window.ms" -> "1999999999",
+    "job.coordinator.system" -> "coordinator",
+    "systems.coordinator.samza.factory" -> classOf[MockCoordinatorStreamSystemFactory].getCanonicalName))
+}
+
+
+
+


[3/3] samza git commit: SAMZA-903: Refactor UI state variables

Posted by ni...@apache.org.
SAMZA-903: Refactor UI state variables


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9396ee5c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9396ee5c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9396ee5c

Branch: refs/heads/master
Commit: 9396ee5cc0a35e4e32844547eacebb24ae971c67
Parents: 920f803
Author: Jagadish Venkatraman <ja...@gmail.com>
Authored: Mon Jul 11 17:10:03 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Jul 12 10:08:51 2016 -0700

----------------------------------------------------------------------
 .../AbstractContainerAllocator.java             |   2 +-
 .../clustermanager/ContainerProcessManager.java |   8 +-
 .../clustermanager/SamzaApplicationState.java   |   8 +-
 .../ContainerProcessManagerMetrics.scala        |   2 +-
 .../clustermanager/TestContainerAllocator.java  |   4 +-
 .../TestContainerProcessManager.java            |   2 +-
 .../TestHostAwareContainerAllocator.java        |   4 +-
 samza-shell/src/main/bash/run-am.sh             |  25 -
 .../job/yarn/AbstractContainerAllocator.java    | 227 ---------
 .../samza/job/yarn/ContainerAllocator.java      |  54 --
 .../apache/samza/job/yarn/ContainerFailure.java |  48 --
 .../samza/job/yarn/ContainerRequestState.java   | 283 -----------
 .../apache/samza/job/yarn/ContainerUtil.java    | 267 ----------
 .../job/yarn/HostAwareContainerAllocator.java   |  89 ----
 .../apache/samza/job/yarn/SamzaAppState.java    | 212 --------
 .../samza/job/yarn/SamzaContainerRequest.java   | 113 -----
 .../apache/samza/job/yarn/SamzaTaskManager.java | 285 -----------
 .../org/apache/samza/job/yarn/YarnAppState.java |  15 +-
 .../job/yarn/YarnClusterResourceManager.java    |  38 +-
 .../resources/scalate/WEB-INF/views/index.scaml |  29 +-
 .../apache/samza/job/yarn/SamzaAppMaster.scala  | 172 -------
 .../job/yarn/SamzaAppMasterLifecycle.scala      |  67 ---
 .../samza/job/yarn/SamzaAppMasterMetrics.scala  |  96 ----
 .../samza/job/yarn/SamzaAppMasterService.scala  | 101 ----
 .../job/yarn/SamzaYarnAppMasterLifecycle.scala  |   6 +-
 .../job/yarn/SamzaYarnAppMasterService.scala    |  46 +-
 .../org/apache/samza/job/yarn/YarnJob.scala     |   4 +-
 .../webapp/ApplicationMasterRestServlet.scala   |   9 +-
 .../webapp/ApplicationMasterWebServlet.scala    |   6 +-
 .../samza/job/yarn/TestContainerAllocator.java  | 137 -----
 .../job/yarn/TestContainerAllocatorCommon.java  | 225 ---------
 .../job/yarn/TestContainerRequestState.java     | 221 --------
 .../yarn/TestHostAwareContainerAllocator.java   | 253 ----------
 .../job/yarn/TestSamzaContainerRequest.java     |  52 --
 .../samza/job/yarn/TestSamzaTaskManager.java    | 502 -------------------
 .../job/yarn/util/MockContainerAllocator.java   |  53 --
 .../yarn/util/MockContainerRequestState.java    |  90 ----
 .../samza/job/yarn/util/MockContainerUtil.java  |  79 ---
 .../apache/samza/job/yarn/util/TestUtil.java    | 263 ----------
 .../apache/samza/job/yarn/MockSystemAdmin.scala |  54 ++
 .../samza/job/yarn/MockSystemFactory.scala      |  43 ++
 .../samza/job/yarn/TestSamzaAppMaster.scala     | 223 --------
 .../job/yarn/TestSamzaAppMasterLifecycle.scala  | 128 -----
 .../job/yarn/TestSamzaAppMasterService.scala    | 153 ------
 .../yarn/TestSamzaYarnAppMasterLifecycle.scala  | 142 ++++++
 .../yarn/TestSamzaYarnAppMasterService.scala    | 121 +++++
 46 files changed, 475 insertions(+), 4486 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index 097a476..d47f217 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -155,7 +155,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
       //launches a StreamProcessor on the resource
       clusterResourceManager.launchStreamProcessor(resource, builder);
 
-      if (state.neededResources.decrementAndGet() == 0) {
+      if (state.neededContainers.decrementAndGet() == 0) {
         state.jobHealthy.set(true);
       }
       state.runningContainers.put(request.getContainerID(), resource);

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index f2db34c..c6bfec0 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -169,7 +169,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     final int containerCount = jobConfig.getContainerCount();
 
     state.containerCount.set(containerCount);
-    state.neededResources.set(containerCount);
+    state.neededContainers.set(containerCount);
 
     // Request initial set of containers
     Map<Integer, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
@@ -249,7 +249,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         state.completedContainers.incrementAndGet();
 
         if (containerId != -1) {
-          state.finishedContainers.add(containerId);
+          state.finishedContainers.incrementAndGet();
           containerFailures.remove(containerId);
         }
 
@@ -277,7 +277,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         if (containerId != -1) {
           log.info("Released container {} was assigned task group ID {}. Requesting a refactor container for the task group.", containerIdStr, containerId);
 
-          state.neededResources.incrementAndGet();
+          state.neededContainers.incrementAndGet();
           state.jobHealthy.set(false);
 
           // request a container on refactor host
@@ -295,7 +295,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         state.jobHealthy.set(false);
 
         if (containerId != -1) {
-          state.neededResources.incrementAndGet();
+          state.neededContainers.incrementAndGet();
           // Find out previously running container location
           String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);
           if (!hostAffinityEnabled || lastSeenOn == null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index ca277b3..cf91044 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -21,8 +21,6 @@ package org.apache.samza.clustermanager;
 
 import org.apache.samza.coordinator.JobModelManager;
 
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -87,15 +85,15 @@ public class SamzaApplicationState {
   public final AtomicInteger containerCount = new AtomicInteger(0);
 
   /**
-   * Set of finished containers - TODO: Can be changed to a counter
+   * Set of finished containers
    */
-  public final Set<Integer> finishedContainers = new HashSet<Integer>();
+  public final AtomicInteger finishedContainers = new AtomicInteger(0);
 
   /**
    *  Number of containers needed for the job to be declared healthy
    *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
    */
-  public final AtomicInteger neededResources = new AtomicInteger(0);
+  public final AtomicInteger neededContainers = new AtomicInteger(0);
 
   /**
    *  Map of the samzaContainerId to the {@link SamzaResource} on which it is running

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 86c2440..f24beb1 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -56,7 +56,7 @@ class ContainerProcessManagerMetrics(
 
    def start() {
     val mRunningContainers = newGauge("running-containers", () => state.runningContainers.size)
-    val mNeededContainers = newGauge("needed-containers", () => state.neededResources.get())
+    val mNeededContainers = newGauge("needed-containers", () => state.neededContainers.get())
     val mCompletedContainers = newGauge("completed-containers", () => state.completedContainers.get())
     val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get())
     val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get())

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index f147570..5351bc3 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -254,11 +254,11 @@ public class TestContainerAllocator {
 
             // This routine should be called after the retry is assigned, but before it's started.
             // So there should still be 1 container needed.
-            assertEquals(1, state.neededResources.get());
+            assertEquals(1, state.neededContainers.get());
           }
         }, null
     );
-    state.neededResources.set(1);
+    state.neededContainers.set(1);
     requestState.registerContainerListener(listener);
 
     containerAllocator.requestResource(0, "2");

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 4fd1018..57a5da6 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -192,7 +192,7 @@ public class TestContainerProcessManager {
     assertTrue(isRunning);
 
     // Verify the remaining state
-    assertEquals(1, state.neededResources.get());
+    assertEquals(1, state.neededContainers.get());
     assertEquals(1, allocator.requestedContainers);
 
     taskManager.stop();

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index 57fef12..b6651f2 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -230,11 +230,11 @@ public class TestHostAwareContainerAllocator {
 
             // This routine should be called after the retry is assigned, but before it's started.
             // So there should still be 1 container needed.
-            assertEquals(1, state.neededResources.get());
+            assertEquals(1, state.neededContainers.get());
           }
         }, null
     );
-    state.neededResources.set(1);
+    state.neededContainers.set(1);
     requestState.registerContainerListener(listener);
 
     // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-shell/src/main/bash/run-am.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-am.sh b/samza-shell/src/main/bash/run-am.sh
deleted file mode 100755
index ca938cc..0000000
--- a/samza-shell/src/main/bash/run-am.sh
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Check if server is set. If not - set server optimization
-[[ $JAVA_OPTS != *-server* ]] && export JAVA_OPTS="$JAVA_OPTS -server"
-
-# Set container name system properties for use in Log4J
-[[ $JAVA_OPTS != *-Dsamza.container.name* ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.name=samza-application-master"
-
-exec $(dirname $0)/run-class.sh org.apache.samza.job.yarn.SamzaAppMaster "$@"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
deleted file mode 100644
index b4789e6..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import java.util.List;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.YarnConfig;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This class is responsible for making requests for containers to the AM and also, assigning a container to run on an allocated resource.
- *
- * Since we are using a simple thread based allocation of a container to an allocated resource, the subclasses should implement {@link java.lang.Runnable} interface.
- * The allocator thread follows the lifecycle of the {@link org.apache.samza.job.yarn.SamzaTaskManager}. Depending on whether host-affinity is enabled or not, the allocation model varies.
- *
- * See {@link org.apache.samza.job.yarn.ContainerAllocator} and {@link org.apache.samza.job.yarn.HostAwareContainerAllocator}
- */
-public abstract class AbstractContainerAllocator implements Runnable {
-  private static final Logger log = LoggerFactory.getLogger(AbstractContainerAllocator.class);
-
-  public static final String ANY_HOST = ContainerRequestState.ANY_HOST;
-  public static final int DEFAULT_PRIORITY = 0;
-  public static final int DEFAULT_CONTAINER_MEM = 1024;
-  public static final int DEFAULT_CPU_CORES = 1;
-
-  protected final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
-  protected final int ALLOCATOR_SLEEP_TIME;
-  protected final ContainerUtil containerUtil;
-  protected final int containerMaxMemoryMb;
-  protected final int containerMaxCpuCore;
-
-  // containerRequestState indicate the state of all unfulfilled container requests and allocated containers
-  private final ContainerRequestState containerRequestState;
-
-  // state that controls the lifecycle of the allocator thread
-  private AtomicBoolean isRunning = new AtomicBoolean(true);
-
-  public AbstractContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
-                            ContainerUtil containerUtil,
-                            ContainerRequestState containerRequestState,
-                            YarnConfig yarnConfig) {
-    this.amClient = amClient;
-    this.containerUtil = containerUtil;
-    this.ALLOCATOR_SLEEP_TIME = yarnConfig.getAllocatorSleepTime();
-    this.containerRequestState = containerRequestState;
-    this.containerMaxMemoryMb = yarnConfig.getContainerMaxMemoryMb();
-    this.containerMaxCpuCore = yarnConfig.getContainerMaxCpuCores();
-  }
-
-  /**
-   * Continuously assigns requested containers to the allocated containers provided by the cluster manager.
-   * The loop frequency is governed by thread sleeps for ALLOCATOR_SLEEP_TIME ms.
-   *
-   * Terminates when the isRunning flag is cleared.
-   */
-  @Override
-  public void run() {
-    while(isRunning.get()) {
-      try {
-        assignContainerRequests();
-
-        // Release extra containers and update the entire system's state
-        containerRequestState.releaseExtraContainers();
-
-        Thread.sleep(ALLOCATOR_SLEEP_TIME);
-      } catch (InterruptedException e) {
-        log.info("Got InterruptedException in AllocatorThread.", e);
-      } catch (Exception e) {
-        log.error("Got unknown Exception in AllocatorThread.", e);
-      }
-    }
-  }
-
-  /**
-   * Assigns the container requests from the queue to the allocated containers from the cluster manager and
-   * runs them.
-   */
-  protected abstract void assignContainerRequests();
-
-  /**
-   * Updates the request state and runs the container on the specified host. Assumes a container
-   * is available on the preferred host, so the caller must verify that before invoking this method.
-   *
-   * @param request             the {@link SamzaContainerRequest} which is being handled.
-   * @param preferredHost       the preferred host on which the container should be run or
-   *                            {@link ContainerRequestState#ANY_HOST} if there is no host preference.
-   */
-  protected void runContainer(SamzaContainerRequest request, String preferredHost) {
-    // Get the available container
-    Container container = peekAllocatedContainer(preferredHost);
-    if (container == null)
-      throw new SamzaException("Expected container was unavailable on host " + preferredHost);
-
-    // Update state
-    containerRequestState.updateStateAfterAssignment(request, preferredHost, container);
-    int expectedContainerId = request.expectedContainerId;
-
-    // Cancel request and run container
-    log.info("Found available containers on {}. Assigning request for container_id {} with "
-            + "timestamp {} to container {}",
-        new Object[]{preferredHost, String.valueOf(expectedContainerId), request.getRequestTimestamp(), container.getId()});
-    try {
-      if (preferredHost.equals(ANY_HOST)) {
-        containerUtil.runContainer(expectedContainerId, container);
-      } else {
-        containerUtil.runMatchedContainer(expectedContainerId, container);
-      }
-    } catch (SamzaContainerLaunchException e) {
-      log.warn(String.format("Got exception while starting container %s. Requesting a new container on any host", container), e);
-      containerRequestState.releaseUnstartableContainer(container);
-      requestContainer(expectedContainerId, ContainerAllocator.ANY_HOST);
-    }
-  }
-
-  /**
-   * Called during initial request for containers
-   *
-   * @param containerToHostMappings Map of containerId to its last seen host (locality).
-   *                                The locality value is null, either
-   *                                - when host-affinity is not enabled, or
-   *                                - when host-affinity is enabled and job is run for the first time
-   */
-  public void requestContainers(Map<Integer, String> containerToHostMappings) {
-    for (Map.Entry<Integer, String> entry : containerToHostMappings.entrySet()) {
-      int containerId = entry.getKey();
-      String preferredHost = entry.getValue();
-      if (preferredHost == null)
-        preferredHost = ANY_HOST;
-
-      requestContainer(containerId, preferredHost);
-    }
-  }
-  /**
-   * Method to request a container resource from yarn
-   *
-   * @param expectedContainerId Identifier of the container that will be run when a container resource is allocated for
-   *                            this request
-   * @param preferredHost Name of the host that you prefer to run the container on
-   */
-  public final void requestContainer(int expectedContainerId, String preferredHost) {
-    SamzaContainerRequest request = new SamzaContainerRequest(
-        containerMaxMemoryMb,
-        containerMaxCpuCore,
-        DEFAULT_PRIORITY,
-        expectedContainerId,
-        preferredHost);
-    containerRequestState.updateRequestState(request);
-    containerUtil.incrementContainerRequests();
-  }
-
-  /**
-   * @return {@code true} if there is a pending request, {@code false} otherwise.
-   */
-  protected boolean hasPendingRequest() {
-    return peekPendingRequest() != null;
-  }
-
-  /**
-   * Retrieves, but does not remove, the next pending request in the queue.
-   *
-   * @return  the pending request or {@code null} if there is no pending request.
-   */
-  protected SamzaContainerRequest peekPendingRequest() {
-    return containerRequestState.getRequestsQueue().peek();
-  }
-
-  /**
-   * Method that adds allocated container to a synchronized buffer of allocated containers list
-   * See allocatedContainers in {@link org.apache.samza.job.yarn.ContainerRequestState}
-   *
-   * @param container Container resource returned by the RM
-   */
-  public final void addContainer(Container container) {
-    containerRequestState.addContainer(container);
-  }
-
-  /**
-   * @param host  the host for which a container is needed.
-   * @return      {@code true} if there is a container allocated for the specified host, {@code false} otherwise.
-   */
-  protected boolean hasAllocatedContainer(String host) {
-    return peekAllocatedContainer(host) != null;
-  }
-
-  /**
-   * Retrieves, but does not remove, the first allocated container on the specified host.
-   *
-   * @param host  the host for which a container is needed.
-   * @return      the first {@link Container} allocated for the specified host or {@code null} if there isn't one.
-   */
-  protected Container peekAllocatedContainer(String host) {
-    List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(host);
-    if (allocatedContainers == null || allocatedContainers.isEmpty()) {
-      return null;
-    }
-
-    return allocatedContainers.get(0);
-  }
-
-  public final void setIsRunning(boolean state) {
-    isRunning.set(state);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
deleted file mode 100644
index 24ac410..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import java.util.List;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.config.YarnConfig;
-
-/**
- * This is the default allocator thread that will be used by SamzaTaskManager.
- *
- * When host-affinity is not enabled, this thread periodically wakes up to assign a container to an allocated resource.
- * If there aren't enough containers, it waits by sleeping for {@code ALLOCATOR_SLEEP_TIME} milliseconds.
- */
-public class ContainerAllocator extends AbstractContainerAllocator {
-  public ContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
-                            ContainerUtil containerUtil,
-                            YarnConfig yarnConfig) {
-    super(amClient, containerUtil, new ContainerRequestState(amClient, false), yarnConfig);
-  }
-
-  /**
-   * This method tries to allocate any unsatisfied request that is still in the request queue
-   * (See requests in {@link org.apache.samza.job.yarn.ContainerRequestState})
-   * with allocated containers, if any.
-   *
-   * Since host-affinity is not enabled, all allocated container resources are buffered in the list keyed by "ANY_HOST".
-   * */
-  @Override
-  public void assignContainerRequests() {
-    while (hasPendingRequest() && hasAllocatedContainer(ANY_HOST)) {
-      SamzaContainerRequest request = peekPendingRequest();
-      runContainer(request, ANY_HOST);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java
deleted file mode 100644
index 1d15651..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn;
-
-/**
- * Class that encapsulates information related to a container failure
- * */
-public class ContainerFailure {
-  /**
-   * Number of times a container has failed
-   * */
-  private int count;
-  /**
-   * Latest failure time of the container
-   * */
-  private Long lastFailure;
-
-  public ContainerFailure(int count,
-                          Long lastFailure) {
-    this.count = count;
-    this.lastFailure = lastFailure;
-  }
-
-  public int getCount() {
-    return count;
-  }
-
-  public Long getLastFailure() {
-    return lastFailure;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
deleted file mode 100644
index 57ce350..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class maintains the state variables for all the container requests and the allocated containers returned
- * by the RM
- * Important: Even though we use concurrent data structures, this class is not thread-safe. Thread safety has to be
- * handled by the caller.
- */
-public class ContainerRequestState {
-  private static final Logger log = LoggerFactory.getLogger(ContainerRequestState.class);
-  public static final String ANY_HOST = "ANY_HOST";
-
-  /**
-   * Maintain a map of hostname to a list of containers allocated on this host
-   */
-  private final ConcurrentHashMap<String, List<Container>> allocatedContainers = new ConcurrentHashMap<String, List<Container>>();
-  /**
-   * Represents the queue of container requests made by the {@link org.apache.samza.job.yarn.SamzaTaskManager}
-   */
-  private final PriorityBlockingQueue<SamzaContainerRequest> requestsQueue = new PriorityBlockingQueue<SamzaContainerRequest>();
-  /**
-   * Maintain a map of hostname to the number of requests made for containers on this host
-   * This state variable is used to look-up whether an allocated container on a host was ever requested in the past.
-   * This map is not updated when host-affinity is not enabled
-   */
-  private final ConcurrentHashMap<String, AtomicInteger> requestsToCountMap = new ConcurrentHashMap<String, AtomicInteger>();
-  /**
-   * Indicates whether host-affinity is enabled or not
-   */
-  private final boolean hostAffinityEnabled;
-
-  private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
-
-  // TODO: Refactor such that the state class for host-affinity enabled allocator is a subclass of a generic state class
-  public ContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
-                               boolean hostAffinityEnabled) {
-    this.amClient = amClient;
-    this.hostAffinityEnabled = hostAffinityEnabled;
-  }
-
-  /**
-   * This method is called every time {@link org.apache.samza.job.yarn.SamzaTaskManager} requestsQueue for a container
-   * Adds {@link org.apache.samza.job.yarn.SamzaContainerRequest} to the requestsQueue queue.
-   * If host-affinity is enabled, it updates the requestsToCountMap as well.
-   *
-   * @param request {@link org.apache.samza.job.yarn.SamzaContainerRequest} that was sent to the RM
-   */
-  public synchronized void updateRequestState(SamzaContainerRequest request) {
-
-    log.info("Requesting a container for {} at {}", request.getExpectedContainerId(), request.getPreferredHost());
-    amClient.addContainerRequest(request.getIssuedRequest());
-
-    requestsQueue.add(request);
-    String preferredHost = request.getPreferredHost();
-    if (hostAffinityEnabled) {
-      if (requestsToCountMap.containsKey(preferredHost)) {
-        requestsToCountMap.get(preferredHost).incrementAndGet();
-      } else {
-        requestsToCountMap.put(preferredHost, new AtomicInteger(1));
-      }
-      /**
-       * The following is important to correlate allocated container data with the requestsQueue made before. If
-       * the preferredHost is requested for the first time, the state should reflect that the allocatedContainers
-       * list is empty and NOT null.
-       */
-      if (!allocatedContainers.containsKey(preferredHost)) {
-        allocatedContainers.put(preferredHost, new ArrayList<Container>());
-      }
-    }
-  }
-
-  /**
-   * This method is called every time the RM returns an allocated container.
-   * Adds the allocated container resource to the correct allocatedContainers buffer
-   * @param container Container resource that was returned by the RM
-   */
-  public synchronized void addContainer(Container container) {
-    if(hostAffinityEnabled) {
-      String hostName = container.getNodeHttpAddress().split(":")[0];
-      AtomicInteger requestCount = requestsToCountMap.get(hostName);
-      // Check if this host was requested for any of the containers
-      if (requestCount == null || requestCount.get() == 0) {
-        log.debug(
-            "Request count for the allocatedContainer on {} is null or 0. This means that the host was not requested " +
-                "for running containers.Hence, saving the container {} in the buffer for ANY_HOST",
-            hostName,
-            container.getId()
-        );
-        addToAllocatedContainerList(ANY_HOST, container);
-      } else {
-        int requestCountOnThisHost = requestCount.get();
-        List<Container> allocatedContainersOnThisHost = allocatedContainers.get(hostName);
-        if (requestCountOnThisHost > 0) {
-          if (allocatedContainersOnThisHost == null || allocatedContainersOnThisHost.size() < requestCountOnThisHost) {
-            log.debug("Saving the container {} in the buffer for {}", container.getId(), hostName);
-            addToAllocatedContainerList(hostName, container);
-          }
-          else {
-              /**
-               * The RM may allocate more containers on a given host than requested. In such a case, even though the
-               * requestCount != 0, it will be greater than the total request count for that host. Hence, it should be
-               * assigned to ANY_HOST
-               */
-              log.debug(
-                  "The number of containers already allocated on {} is greater than what was " +
-                      "requested, which is {}. Hence, saving the container {} in the buffer for ANY_HOST",
-                  new Object[]{
-                      hostName,
-                      requestCountOnThisHost,
-                      container.getId()
-                  }
-              );
-              addToAllocatedContainerList(ANY_HOST, container);
-            }
-          }
-        }
-      }
-     else {
-      log.debug("Saving the container {} in the buffer for ANY_HOST", container.getId());
-      addToAllocatedContainerList(ANY_HOST, container);
-    }
-  }
-
-  // Update the allocatedContainers list
-  private void addToAllocatedContainerList(String host, Container container) {
-    List<Container> list = allocatedContainers.get(host);
-    if (list != null) {
-      list.add(container);
-    } else {
-      list = new ArrayList<Container>();
-      list.add(container);
-      allocatedContainers.put(host, list);
-    }
-  }
-
-  /**
-   * This method updates the state after a request is fulfilled and a container starts running on a host
-   * Needs to be synchronized because the state buffers are populated by the AMRMCallbackHandler, whereas it is drained by the allocator thread
-   *
-   * @param request {@link org.apache.samza.job.yarn.SamzaContainerRequest} that was fulfilled
-   * @param assignedHost  Host to which the container was assigned
-   * @param container Allocated container resource that was used to satisfy this request
-   */
-  public synchronized void updateStateAfterAssignment(SamzaContainerRequest request, String assignedHost, Container container) {
-    requestsQueue.remove(request);
-    allocatedContainers.get(assignedHost).remove(container);
-    if (hostAffinityEnabled) {
-      // assignedHost may not always be the preferred host.
-      // Hence, we should safely decrement the counter for the preferredHost
-      requestsToCountMap.get(request.getPreferredHost()).decrementAndGet();
-    }
-    // To avoid getting back excess containers
-    amClient.removeContainerRequest(request.getIssuedRequest());
-  }
-
-  /**
-   * If requestQueue is empty, all extra containers in the buffer should be released and update the entire system's state
-   * Needs to be synchronized because it is modifying shared state buffers
-   * @return the number of containers released.
-   */
-  public synchronized int releaseExtraContainers() {
-    int numReleasedContainers = 0;
-
-    if (requestsQueue.isEmpty()) {
-      log.debug("Container Requests Queue is empty.");
-
-      if (hostAffinityEnabled) {
-        List<String> allocatedHosts = getAllocatedHosts();
-        for (String host : allocatedHosts) {
-          numReleasedContainers += releaseContainersForHost(host);
-        }
-      } else {
-        numReleasedContainers += releaseContainersForHost(ANY_HOST);
-      }
-      clearState();
-    }
-    return numReleasedContainers;
-  }
-
-  /**
-   * Releases all allocated containers for the specified host.
-   * @param host  the host for which the containers should be released.
-   * @return      the number of containers released.
-   */
-  private int releaseContainersForHost(String host) {
-    int numReleasedContainers = 0;
-    List<Container> containers = getContainersOnAHost(host);
-    if (containers != null) {
-      for (Container c : containers) {
-        log.info("Releasing extra container {} allocated on {}", c.getId(), host);
-        amClient.releaseAssignedContainer(c.getId());
-        numReleasedContainers++;
-      }
-    }
-    return numReleasedContainers;
-  }
-
-  /**
-   * Releases a container that was allocated and assigned but could not be started.
-   * e.g. because of a ConnectException while trying to communicate with the NM.
-   * This method assumes the specified container and associated request have already
-   * been removed from their respective queues.
-   *
-   * @param container the {@link Container} to release.
-   */
-  public void releaseUnstartableContainer(Container container) {
-    log.info("Releasing unstartable container {}", container.getId());
-    amClient.releaseAssignedContainer(container.getId());
-  }
-
-  /**
-   * Clears all the state variables
-   * Performed when there are no more unfulfilled requests
-   * This is not synchronized because it is private.
-   */
-  private void clearState() {
-    allocatedContainers.clear();
-    requestsToCountMap.clear();
-    requestsQueue.clear();
-  }
-
-  /**
-   * Returns the list of hosts which has at least 1 allocatedContainer in the buffer
-   * @return list of host names
-   */
-  private List<String> getAllocatedHosts() {
-    List<String> hostKeys = new ArrayList<String>();
-    for(Map.Entry<String, List<Container>> entry: allocatedContainers.entrySet()) {
-      if(entry.getValue().size() > 0) {
-        hostKeys.add(entry.getKey());
-      }
-    }
-    return hostKeys;
-  }
-
-  /**
-   * Returns the list of containers allocated on a given host
-   * If no containers were ever allocated on the given host, it returns null.
-   * @param host hostname
-   * @return list of containers allocated on the given host, or null
-   */
-  public List<Container> getContainersOnAHost(String host) {
-    return allocatedContainers.get(host);
-  }
-
-  public PriorityBlockingQueue<SamzaContainerRequest> getRequestsQueue() {
-    return requestsQueue;
-  }
-
-  public ConcurrentHashMap<String, AtomicInteger> getRequestsToCountMap() {
-    return requestsToCountMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
deleted file mode 100644
index e8976bc..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.job.CommandBuilder;
-import org.apache.samza.job.ShellCommandBuilder;
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ContainerUtil {
-  private static final Logger log = LoggerFactory.getLogger(ContainerUtil.class);
-
-  private final Config config;
-  private final SamzaAppState state;
-  private final YarnConfiguration yarnConfiguration;
-
-  private NMClient nmClient;
-  private final YarnConfig yarnConfig;
-  private final TaskConfig taskConfig;
-
-  public ContainerUtil(Config config,
-                       SamzaAppState state,
-                       YarnConfiguration yarnConfiguration) {
-    this.config = config;
-    this.state = state;
-    this.yarnConfiguration = yarnConfiguration;
-
-    this.nmClient = NMClient.createNMClient();
-    nmClient.init(this.yarnConfiguration);
-
-    this.yarnConfig = new YarnConfig(config);
-    this.taskConfig = new TaskConfig(config);
-  }
-
-  protected void setNmClient(NMClient nmClient){
-    this.nmClient = nmClient;
-  }
-
-  public void incrementContainerRequests() {
-    state.containerRequests.incrementAndGet();
-  }
-
-  public void runMatchedContainer(int samzaContainerId, Container container) throws SamzaContainerLaunchException {
-    state.matchedContainerRequests.incrementAndGet();
-    runContainer(samzaContainerId, container);
-  }
-
-  public void runContainer(int samzaContainerId, Container container) throws SamzaContainerLaunchException {
-    String containerIdStr = ConverterUtils.toString(container.getId());
-    log.info("Got available container ID ({}) for container: {}", samzaContainerId, container);
-
-
-    // check if we have framework path specified. If yes - use it, if not use default ./__package/
-    String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries
-    String cmdPath = "./__package/";
-
-    String fwkPath = JobConfig.getFwkPath(config);
-    if(fwkPath != null && (! fwkPath.isEmpty())) {
-      cmdPath = fwkPath;
-      jobLib = "export JOB_LIB_DIR=./__package/lib";
-    }
-    log.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + cmdPath + ";jobLib=" + jobLib);
-
-
-    CommandBuilder cmdBuilder = getCommandBuilder(samzaContainerId, cmdPath);
-    String command = cmdBuilder.buildCommand();
-    log.info("Container ID {} using command {}", samzaContainerId, command);
-
-    Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
-    printContainerEnvironmentVariables(samzaContainerId, env);
-
-    log.info("Samza FWK path: " + command + "; env=" + env);
-
-    Path path = new Path(yarnConfig.getPackagePath());
-    log.info("Starting container ID {} using package path {}", samzaContainerId, path);
-
-    startContainer(path, container, env,
-        getFormattedCommand(ApplicationConstants.LOG_DIR_EXPANSION_VAR, jobLib, command, ApplicationConstants.STDOUT,
-            ApplicationConstants.STDERR));
-
-    if (state.neededContainers.decrementAndGet() == 0) {
-      state.jobHealthy.set(true);
-    }
-    state.runningContainers.put(samzaContainerId, new YarnContainer(container));
-
-    log.info("Claimed container ID {} for container {} on node {} (http://{}/node/containerlogs/{}).",
-        new Object[]{samzaContainerId, containerIdStr, container
-            .getNodeId().getHost(), container.getNodeHttpAddress(), containerIdStr});
-
-    log.info("Started container ID {}", samzaContainerId);
-  }
-
-  protected void startContainer(Path packagePath,
-                                Container container,
-                                Map<String, String> env,
-                                final String cmd)
-      throws SamzaContainerLaunchException {
-    log.info("Starting container {} {} {} {}",
-        new Object[]{packagePath, container, env, cmd});
-
-    // set the local package so that the containers and app master are provisioned with it
-    LocalResource packageResource = Records.newRecord(LocalResource.class);
-    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
-    FileStatus fileStatus;
-    try {
-      fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
-    } catch (IOException ioe) {
-      log.error("IO Exception when accessing the package status from the filesystem", ioe);
-      throw new SamzaException("IO Exception when accessing the package status from the filesystem");
-    }
-
-    packageResource.setResource(packageUrl);
-    packageResource.setSize(fileStatus.getLen());
-    packageResource.setTimestamp(fileStatus.getModificationTime());
-    packageResource.setType(LocalResourceType.ARCHIVE);
-    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
-
-    ByteBuffer allTokens;
-    // copy tokens (copied from dist shell example)
-    try {
-      Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-      DataOutputBuffer dob = new DataOutputBuffer();
-      credentials.writeTokenStorageToStream(dob);
-
-      // now remove the AM->RM token so that containers cannot access it
-      Iterator iter = credentials.getAllTokens().iterator();
-      while (iter.hasNext()) {
-        TokenIdentifier token = ((Token) iter.next()).decodeIdentifier();
-        if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
-          iter.remove();
-        }
-      }
-      allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-
-    } catch (IOException ioe) {
-      ioe.printStackTrace();
-      throw new SamzaException("IO Exception when writing credentials to output buffer");
-    }
-
-    ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
-    context.setEnvironment(env);
-    context.setTokens(allTokens.duplicate());
-    context.setCommands(new ArrayList<String>() {{add(cmd);}});
-    context.setLocalResources(Collections.singletonMap("__package", packageResource));
-
-    log.debug("setting package to {}", packageResource);
-    log.debug("setting context to {}", context);
-
-    StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
-    startContainerRequest.setContainerLaunchContext(context);
-    try {
-      nmClient.startContainer(container, context);
-    } catch (YarnException ye) {
-      log.error("Received YarnException when starting container: " + container.getId(), ye);
-      throw new SamzaContainerLaunchException("Received YarnException when starting container: " + container.getId(), ye);
-    } catch (IOException ioe) {
-      log.error("Received IOException when starting container: " + container.getId(), ioe);
-      throw new SamzaContainerLaunchException("Received IOException when starting container: " + container.getId(), ioe);
-    }
-  }
-
-  private String getFormattedCommand(String logDirExpansionVar,
-                                     String jobLib,
-                                     String command,
-                                     String stdOut,
-                                     String stdErr) {
-    if (!jobLib.isEmpty()) {
-      jobLib = "&& " + jobLib; // add job's libraries exported to an env variable
-    }
-
-    return String
-        .format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar,
-            jobLib, logDirExpansionVar, command, stdOut, stdErr);
-  }
-
-  /**
-   * Instantiates and initializes the configured {@link CommandBuilder} class.
-   *
-   * @param samzaContainerId  the Samza container Id for which the container start command will be built.
-   * @return                  the command builder, which is initialized and ready to build the command.
-   */
-  private CommandBuilder getCommandBuilder(int samzaContainerId, String cmdPath) {
-    String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName());
-    CommandBuilder cmdBuilder = (CommandBuilder) Util.getObj(cmdBuilderClassName);
-    cmdBuilder.setConfig(config).setId(samzaContainerId).setUrl(state.coordinatorUrl).setCommandPath(cmdPath);
-    return cmdBuilder;
-  }
-
-  /**
-   * Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters.
-   *
-   * @param cmdBuilder        the command builder containing the environment variables.
-   * @return                  the map containing the escaped environment variables.
-   */
-  private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmdBuilder) {
-    Map<String, String> env = new HashMap<String, String>();
-    for (Map.Entry<String, String> entry : cmdBuilder.buildEnvironment().entrySet()) {
-      String escapedValue = Util.envVarEscape(entry.getValue());
-      env.put(entry.getKey(), escapedValue);
-    }
-
-    return env;
-  }
-
-  /**
-   * @param samzaContainerId  the Samza container Id for logging purposes.
-   * @param env               the Map of environment variables to their respective values.
-   */
-  private void printContainerEnvironmentVariables(int samzaContainerId, Map<String, String> env) {
-    StringBuilder sb = new StringBuilder();
-    for (Map.Entry<String, String> entry : env.entrySet()) {
-      sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));
-    }
-    log.info("Container ID {} using environment variables: {}", samzaContainerId, sb.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
deleted file mode 100644
index 1d101fa..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.samza.config.YarnConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is the allocator thread that will be used by SamzaTaskManager when host-affinity is enabled for a job. It is similar to {@link org.apache.samza.job.yarn.ContainerAllocator}, except that it considers container locality for allocation.
- *
- * In case of host-affinity, each container request ({@link org.apache.samza.job.yarn.SamzaContainerRequest} encapsulates the identifier of the container to be run and a "preferredHost". preferredHost is determined by the locality mappings in the coordinator stream.
- * This thread periodically wakes up and makes the best-effort to assign a container to the preferredHost. If the preferredHost is not returned by the RM before the corresponding container expires, the thread assigns the container to any other host that is allocated next.
- * The container expiry is determined by CONTAINER_REQUEST_TIMEOUT and is configurable on a per-job basis.
- *
- * If there aren't enough containers, it waits by sleeping for ALLOCATOR_SLEEP_TIME milliseconds.
- */
-public class HostAwareContainerAllocator extends AbstractContainerAllocator {
-  private static final Logger log = LoggerFactory.getLogger(HostAwareContainerAllocator.class);
-
-  private final int CONTAINER_REQUEST_TIMEOUT;
-
-  public HostAwareContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
-                                     ContainerUtil containerUtil,
-                                     YarnConfig yarnConfig) {
-    super(amClient, containerUtil, new ContainerRequestState(amClient, true), yarnConfig);
-    this.CONTAINER_REQUEST_TIMEOUT = yarnConfig.getContainerRequestTimeout();
-  }
-
-  /**
-   * Since host-affinity is enabled, all allocated container resources are buffered in the list keyed by "preferredHost".
-   *
-   * If the requested host is not available, the thread checks to see if the request has expired.
-   * If it has expired, it runs the container with expectedContainerID on one of the available hosts from the
-   * allocatedContainers buffer keyed by "ANY_HOST".
-   */
-  @Override
-  public void assignContainerRequests() {
-    while (hasPendingRequest()) {
-      SamzaContainerRequest request = peekPendingRequest();
-      String preferredHost = request.getPreferredHost();
-      int expectedContainerId = request.getExpectedContainerId();
-
-      log.info("Handling request for container id {} on preferred host {}", expectedContainerId, preferredHost);
-
-      if (hasAllocatedContainer(preferredHost)) {
-        // Found allocated container at preferredHost
-        runContainer(request, preferredHost);
-      } else {
-        // No allocated container on preferredHost
-        log.info("Did not find any allocated containers on preferred host {} for running container id {}",
-            preferredHost, expectedContainerId);
-
-        boolean expired = requestExpired(request);
-        if (expired && hasAllocatedContainer(ANY_HOST)) {
-          runContainer(request, ANY_HOST);
-        } else {
-          log.info("Either the request timestamp {} is greater than container request timeout {}ms or we couldn't "
-                  + "find any free allocated containers in the buffer. Breaking out of loop.",
-              request.getRequestTimestamp(), CONTAINER_REQUEST_TIMEOUT);
-          break;
-        }
-      }
-    }
-
-  }
-
-  private boolean requestExpired(SamzaContainerRequest request) {
-    return System.currentTimeMillis() - request.getRequestTimestamp() > CONTAINER_REQUEST_TIMEOUT;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
deleted file mode 100644
index c116ed8..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn;
-
-import java.util.Map;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.samza.coordinator.JobModelManager;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-
-import java.net.URL;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class SamzaAppState {
-  /**
-   * Represents an invalid or unknown Samza container ID.
-   */
-  private static final int UNUSED_CONTAINER_ID = -1;
-
-  /**
-   * Job Coordinator is started in the AM and follows the {@link org.apache.samza.job.yarn.SamzaAppMasterService}
-   * lifecycle. It helps querying JobModel related info in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
-   * and locality information when host-affinity is enabled in {@link org.apache.samza.job.yarn.SamzaTaskManager}
-   */
-  public final JobModelManager jobCoordinator;
-
-  /*  The following state variables are primarily used for reference in the AM web services   */
-  /**
-   * Task Id of the AM
-   * Used for displaying in the AM UI. Usage found in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
-   * and scalate/WEB-INF/views/index.scaml
-   */
-  public final int taskId;
-  /**
-   * Id of the AM container (as allocated by the RM)
-   * Used for displaying in the AM UI. Usage in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
-   * and scalate/WEB-INF/views/index.scaml
-   */
-  public final ContainerId amContainerId;
-  /**
-   * Host name of the NM on which the AM is running
-   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
-   */
-  public final String nodeHost;
-  /**
-   * NM port on which the AM is running
-   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
-   */
-  public final int nodePort;
-  /**
-   * Http port of the NM on which the AM is running
-   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
-   */
-  public final int nodeHttpPort;
-  /**
-   * Application Attempt Id as provided by Yarn
-   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
-   * and {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
-   */
-  public final ApplicationAttemptId appAttemptId;
-  /**
-   * JMX Server URL, if enabled
-   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
-   */
-  public String jmxUrl = "";
-  /**
-   * JMX Server Tunneling URL, if enabled
-   * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
-   */
-  public String jmxTunnelingUrl = "";
-  /**
-   * Job Coordinator URL
-   * Usage in {@link org.apache.samza.job.yarn.SamzaAppMasterService} &amp; ContainerUtil
-   */
-  public URL coordinatorUrl = null;
-  /**
-   * URL of the {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
-   */
-  public URL rpcUrl = null;
-  /**
-   * URL of the {@link org.apache.samza.webapp.ApplicationMasterWebServlet}
-   */
-  public URL trackingUrl = null;
-
-  /**
-   * The following state variables are required for the correct functioning of the TaskManager
-   * Some of them are shared between the AMRMCallbackThread and the ContainerAllocator thread, as mentioned below.
-   */
-
-  /**
-   * Number of containers that have completed their execution and exited successfully
-   */
-  public AtomicInteger completedContainers = new AtomicInteger(0);
-
-  /**
-   * Number of failed containers
-   * */
-  public AtomicInteger failedContainers = new AtomicInteger(0);
-
-  /**
-   * Number of containers released due to extra allocation returned by the RM
-   */
-  public AtomicInteger releasedContainers = new AtomicInteger(0);
-
-  /**
-   * ContainerStatus of failed containers.
-   */
-  public ConcurrentMap<String, ContainerStatus> failedContainersStatus = new ConcurrentHashMap<String, ContainerStatus>();
-
-  /**
-   * Number of containers configured for the job
-   */
-  public int containerCount = 0;
-
-  /**
-   * Set of finished containers - TODO: Can be changed to a counter
-   */
-  public Set<Integer> finishedContainers = new HashSet<Integer>();
-
-  /**
-   *  Number of containers needed for the job to be declared healthy
-   *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
-   */
-  public AtomicInteger neededContainers = new AtomicInteger(0);
-
-  /**
-   *  Map of the samzaContainerId to the {@link org.apache.samza.job.yarn.YarnContainer} on which it is running
-   *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
-   */
-  public ConcurrentMap<Integer, YarnContainer> runningContainers = new ConcurrentHashMap<Integer, YarnContainer>(0);
-
-  /**
-   * Final status of the application
-   * Modified by both the AMRMCallbackThread and the ContainerAllocator thread
-   */
-  public FinalApplicationStatus status = FinalApplicationStatus.UNDEFINED;
-
-  /**
-   * State indicating whether the job is healthy or not
-   * Modified by both the AMRMCallbackThread and the ContainerAllocator thread
-   */
-  public AtomicBoolean jobHealthy = new AtomicBoolean(true);
-
-  public AtomicInteger containerRequests = new AtomicInteger(0);
-
-  public AtomicInteger matchedContainerRequests = new AtomicInteger(0);
-
-  public SamzaAppState(JobModelManager jobModelManager,
-                       int taskId,
-                       ContainerId amContainerId,
-                       String nodeHost,
-                       int nodePort,
-                       int nodeHttpPort) {
-    this.jobCoordinator = jobModelManager;
-    this.taskId = taskId;
-    this.amContainerId = amContainerId;
-    this.nodeHost = nodeHost;
-    this.nodePort = nodePort;
-    this.nodeHttpPort = nodeHttpPort;
-    this.appAttemptId = amContainerId.getApplicationAttemptId();
-
-  }
-
-  /**
-   * Returns the Samza container ID if the specified YARN container ID corresponds to a running container.
-   *
-   * @param yarnContainerId the YARN container ID.
-   * @return                the Samza container ID if it is running,
-   *                        otherwise {@link SamzaAppState#UNUSED_CONTAINER_ID}.
-   */
-  public int getRunningSamzaContainerId(ContainerId yarnContainerId) {
-    int containerId = UNUSED_CONTAINER_ID;
-    for(Map.Entry<Integer, YarnContainer> entry: runningContainers.entrySet()) {
-      if(entry.getValue().id().equals(yarnContainerId)) {
-        containerId = entry.getKey();
-        break;
-      }
-    }
-    return containerId;
-  }
-
-  /**
-   * @param samzaContainerId  the Samza container ID to validate.
-   * @return                  {@code true} if the ID is valid, {@code false} otherwise
-   */
-  public static boolean isValidContainerId(int samzaContainerId) {
-    return samzaContainerId != UNUSED_CONTAINER_ID;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
deleted file mode 100644
index 4a04eb6..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn;
-
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-
-/**
- * SamzaContainerRequest encapsulate the ContainerRequest and the meta-information of the request.
- */
-public class SamzaContainerRequest implements Comparable<SamzaContainerRequest> {
-  private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
-
-  private Priority priority;
-  private Resource capability;
-
-  /**
-   *  If host-affinity is enabled, the request specifies a preferred host for the container
-   *  If not, preferredHost defaults to ANY_HOST
-   */
-  private String preferredHost;
-  // Timestamp at which the request is issued. Used to check request expiry
-  private Long requestTimestamp;
-  // Actual Container Request that is issued to the RM
-  public AMRMClient.ContainerRequest issuedRequest;
-  // Container Id that is expected to run on the container returned for this request
-  public int expectedContainerId;
-
-  public SamzaContainerRequest(int memoryMb, int cpuCores, int priority, int expectedContainerId, String preferredHost) {
-    this.capability = Resource.newInstance(memoryMb, cpuCores);
-    this.priority = Priority.newInstance(priority);
-    this.expectedContainerId = expectedContainerId;
-    if (preferredHost == null || preferredHost.equals(ANY_HOST)) {
-      this.preferredHost = ANY_HOST;
-      this.issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, this.priority);
-    } else {
-      this.preferredHost = preferredHost;
-      this.issuedRequest = new AMRMClient.ContainerRequest(
-          capability,
-          new String[]{this.preferredHost},
-          null,
-          this.priority);
-    }
-
-    this.requestTimestamp = System.currentTimeMillis();
-  }
-
-  // Convenience constructor for unit testing
-  protected SamzaContainerRequest(int expectedContainerId, String preferredHost) {
-    this(
-        AbstractContainerAllocator.DEFAULT_CONTAINER_MEM,
-        AbstractContainerAllocator.DEFAULT_CPU_CORES,
-        AbstractContainerAllocator.DEFAULT_PRIORITY,
-        expectedContainerId,
-        preferredHost);
-  }
-
-  public Resource getCapability() {
-    return capability;
-  }
-
-  public Priority getPriority() {
-    return priority;
-  }
-
-  public AMRMClient.ContainerRequest getIssuedRequest() {
-    return issuedRequest;
-  }
-
-  public int getExpectedContainerId() {
-    return expectedContainerId;
-  }
-
-  public String getPreferredHost() {
-    return preferredHost;
-  }
-
-  public Long getRequestTimestamp() {
-    return requestTimestamp;
-  }
-
-  @Override
-  public String toString() {
-    return "[requestIssueTime=" + issuedRequest.toString() + "]";
-  }
-
-  @Override
-  public int compareTo(SamzaContainerRequest o) {
-    if(requestTimestamp < o.requestTimestamp)
-      return -1;
-    if(requestTimestamp > o.requestTimestamp)
-      return 1;
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
deleted file mode 100644
index bc95f31..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Samza's application master is mostly interested in requesting containers to
- * run Samza jobs. SamzaTaskManager is responsible for requesting new
- * containers, handling failures, and notifying the application master that the
- * job is done.
- *
- * The following main threads are involved in the execution of the Samza AM:
- *  - The main thread (defined in SamzaAppMaster) that drive the AM to send out container requests to RM
- *  - The callback handler thread that receives the responses from RM and handles:
- *      - Populating a buffer when a container is allocated by the RM
- *        (allocatedContainers in {@link org.apache.samza.job.yarn.ContainerRequestState}
- *      - Identifying the cause of container failure & re-request containers from RM by adding request to the
- *        internal requestQueue in {@link org.apache.samza.job.yarn.ContainerRequestState}
- *  - The allocator thread defined here assigns the allocated containers to pending requests
- *    (See {@link org.apache.samza.job.yarn.ContainerAllocator} or {@link org.apache.samza.job.yarn.HostAwareContainerAllocator})
- */
-
-class SamzaTaskManager implements YarnAppMasterListener {
-  private static final Logger log = LoggerFactory.getLogger(SamzaTaskManager.class);
-
-  private final boolean hostAffinityEnabled;
-  private final SamzaAppState state;
-
-  // Derived configs
-  private final JobConfig jobConfig;
-  private final YarnConfig yarnConfig;
-
-  private final AbstractContainerAllocator containerAllocator;
-  private final Thread allocatorThread;
-
-  // State
-  private boolean tooManyFailedContainers = false;
-  private Map<Integer, ContainerFailure> containerFailures = new HashMap<Integer, ContainerFailure>();
-
-  public SamzaTaskManager(Config config,
-                          SamzaAppState state,
-                          AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
-                          YarnConfiguration conf) {
-    this.state = state;
-    this.jobConfig = new JobConfig(config);
-    this.yarnConfig = new YarnConfig(config);
-
-    this.hostAffinityEnabled = yarnConfig.getHostAffinityEnabled();
-
-    if (this.hostAffinityEnabled) {
-      this.containerAllocator = new HostAwareContainerAllocator(
-          amClient,
-          new ContainerUtil(config, state, conf),
-          yarnConfig
-      );
-    } else {
-      this.containerAllocator = new ContainerAllocator(
-          amClient,
-          new ContainerUtil(config, state, conf),
-          yarnConfig);
-    }
-
-    this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
-  }
-
-  @Override
-  public boolean shouldShutdown() {
-    return tooManyFailedContainers || state.completedContainers.get() == state.containerCount || !allocatorThread.isAlive();
-  }
-
-  @Override
-  public void onInit() {
-    state.containerCount = jobConfig.getContainerCount();
-
-    state.neededContainers.set(state.containerCount);
-
-    // Request initial set of containers
-    Map<Integer, String> containerToHostMapping = state.jobCoordinator.jobModel().getAllContainerLocality();
-
-    containerAllocator.requestContainers(containerToHostMapping);
-
-    // Start container allocator thread
-    log.info("Starting the container allocator thread");
-    allocatorThread.start();
-  }
-
-  @Override
-  public void onReboot() {
-
-  }
-
-  @Override
-  public void onShutdown() {
-    // Shutdown allocator thread
-    containerAllocator.setIsRunning(false);
-    try {
-      allocatorThread.join();
-    } catch (InterruptedException ie) {
-      log.info("Allocator Thread join() threw an interrupted exception", ie);
-      // Should we throw exception here??
-    }
-  }
-
-  @Override
-  public void onContainerAllocated(Container container) {
-    containerAllocator.addContainer(container);
-  }
-
-  /**
-   * This methods handles the onResourceCompleted callback from the RM. Based on the ContainerExitStatus, it decides
-   * whether a container that exited is marked as complete or failure.
-   */
-  @Override
-  public void onContainerCompleted(ContainerStatus containerStatus) {
-    String containerIdStr = ConverterUtils.toString(containerStatus.getContainerId());
-    int containerId = state.getRunningSamzaContainerId(containerStatus.getContainerId());
-    state.runningContainers.remove(containerId);
-
-    int exitStatus = containerStatus.getExitStatus();
-    switch(exitStatus) {
-      case ContainerExitStatus.SUCCESS:
-        log.info("Container {} completed successfully.", containerIdStr);
-
-        state.completedContainers.incrementAndGet();
-
-        if (SamzaAppState.isValidContainerId(containerId)) {
-          state.finishedContainers.add(containerId);
-          containerFailures.remove(containerId);
-        }
-
-        if (state.completedContainers.get() == state.containerCount) {
-          log.info("Setting job status to SUCCEEDED, since all containers have been marked as completed.");
-          state.status = FinalApplicationStatus.SUCCEEDED;
-        }
-        break;
-
-      case ContainerExitStatus.DISKS_FAILED:
-      case ContainerExitStatus.ABORTED:
-      case ContainerExitStatus.PREEMPTED:
-        log.info("Got an exit code of {}. This means that container {} was "
-            + "killed by YARN, either due to being released by the application "
-            + "master or being 'lost' due to node failures etc. or due to preemption by the RM",
-            exitStatus,
-            containerIdStr);
-
-        state.releasedContainers.incrementAndGet();
-        // If this container was assigned some partitions (a containerId), then
-        // clean up, and request a new container for the tasks. This only
-        // should happen if the container was 'lost' due to node failure, not
-        // if the AM released the container.
-        if (SamzaAppState.isValidContainerId(containerId)) {
-          log.info("Released container {} was assigned task group ID {}. Requesting a new container for the task group.", containerIdStr, containerId);
-
-          state.neededContainers.incrementAndGet();
-          state.jobHealthy.set(false);
-
-          // request a container on new host
-          containerAllocator.requestContainer(containerId, ContainerAllocator.ANY_HOST);
-        }
-
-        break;
-
-      default:
-        // TODO: Handle failure more intelligently. Should track NodeFailures!
-        log.info("Container failed for some reason. Let's start it again");
-        log.info("Container " + containerIdStr + " failed with exit code " + exitStatus + " - " + containerStatus.getDiagnostics());
-
-        state.failedContainers.incrementAndGet();
-        state.failedContainersStatus.put(containerIdStr, containerStatus);
-        state.jobHealthy.set(false);
-
-        if(SamzaAppState.isValidContainerId(containerId)) {
-          state.neededContainers.incrementAndGet();
-          recordContainerFailCount(containerIdStr, containerId);
-
-          if (!tooManyFailedContainers) {
-            // Find out previously running container location
-            String lastSeenOn = state.jobCoordinator.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);
-            if (!hostAffinityEnabled || lastSeenOn == null) {
-              lastSeenOn = ContainerAllocator.ANY_HOST;
-            }
-            // Request a new container
-            containerAllocator.requestContainer(containerId, lastSeenOn);
-          }
-        }
-
-    }
-  }
-
-  /**
-   * Increments the failure count, logs the failure, and records the  last failure time for the specified container.
-   * Also, updates the global flag indicating whether too many failures have occurred and returns that flag.
-   *
-   * @param containerIdStr  the YARN container Id for logging purposes.
-   * @param containerId     the Samza container/group Id that failed.
-   * @return                true if any container has failed more than the max number of times.
-   */
-  private boolean recordContainerFailCount(String containerIdStr, int containerId) {
-    // A container failed for an unknown reason. Let's check to see if
-    // we need to shutdown the whole app master if too many container
-    // failures have happened. The rules for failing are that the
-    // failure count for a task group id must be > the configured retry
-    // count, and the last failure (the one prior to this one) must have
-    // happened less than retry window ms ago. If retry count is set to
-    // 0, the app master will fail on any container failure. If the
-    // retry count is set to a number < 0, a container failure will
-    // never trigger an app master failure.
-    int retryCount = yarnConfig.getContainerRetryCount();
-    int retryWindowMs = yarnConfig.getContainerRetryWindowMs();
-
-    if (retryCount == 0) {
-      log.error("Container ID {} ({}) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed.", containerId, containerIdStr);
-
-      tooManyFailedContainers = true;
-    } else if (retryCount > 0) {
-      int currentFailCount;
-      long lastFailureTime;
-      if(containerFailures.containsKey(containerId)) {
-        ContainerFailure failure = containerFailures.get(containerId);
-        currentFailCount = failure.getCount() + 1;
-        lastFailureTime = failure.getLastFailure();
-        } else {
-        currentFailCount = 1;
-        lastFailureTime = 0L;
-      }
-      if (currentFailCount >= retryCount) {
-        long lastFailureMsDiff = System.currentTimeMillis() - lastFailureTime;
-
-        if (lastFailureMsDiff < retryWindowMs) {
-          log.error("Container ID " + containerId + "(" + containerIdStr + ") has failed " + currentFailCount +
-              " times, with last failure " + lastFailureMsDiff + "ms ago. This is greater than retry count of " +
-              retryCount + " and window of " + retryWindowMs + "ms , so shutting down the application master, and marking the job as failed.");
-
-          // We have too many failures, and we're within the window
-          // boundary, so reset shut down the app master.
-          tooManyFailedContainers = true;
-          state.status = FinalApplicationStatus.FAILED;
-        } else {
-          log.info("Resetting fail count for container ID {} back to 1, since last container failure ({}) for " +
-              "this container ID was outside the bounds of the retry window.", containerId, containerIdStr);
-
-          // Reset counter back to 1, since the last failure for this
-          // container happened outside the window boundary.
-          containerFailures.put(containerId, new ContainerFailure(1, System.currentTimeMillis()));
-        }
-      } else {
-        log.info("Current fail count for container ID {} is {}.", containerId, currentFailCount);
-        containerFailures.put(containerId, new ContainerFailure(currentFailCount, System.currentTimeMillis()));
-      }
-    }
-    return tooManyFailedContainers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
index 57092e1..7e563f1 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
@@ -53,29 +53,24 @@ public class YarnAppState {
 
   public ConcurrentMap<String, ContainerStatus> failedContainersStatus = new ConcurrentHashMap<String, ContainerStatus>();
 
-  public YarnAppState(JobModelManager jobModelManager,
-                      int taskId,
+  public YarnAppState(int taskId,
                       ContainerId amContainerId,
                       String nodeHost,
                       int nodePort,
-                      int nodeHttpPort,
-                      SamzaApplicationState state) {
-    this.jobModelManager = jobModelManager;
+                      int nodeHttpPort
+                      ) {
     this.taskId = taskId;
     this.amContainerId = amContainerId;
     this.nodeHost = nodeHost;
     this.nodePort = nodePort;
     this.nodeHttpPort = nodeHttpPort;
     this.appAttemptId = amContainerId.getApplicationAttemptId();
-    this.samzaAppState = state;
   }
 
 
   @Override
   public String toString() {
     return "YarnAppState{" +
-        "samzaAppState=" + samzaAppState +
-        ", jobModelReader=" + jobModelManager +
         ", taskId=" + taskId +
         ", amContainerId=" + amContainerId +
         ", nodeHost='" + nodeHost + '\'' +
@@ -90,7 +85,6 @@ public class YarnAppState {
         '}';
   }
 
-  public final SamzaApplicationState samzaAppState;
    /* The following state variables are primarily used for reference in the AM web services   */
 
   /**
@@ -98,7 +92,6 @@ public class YarnAppState {
    * Used for displaying in the AM UI. Usage found in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
    * and scalate/WEB-INF/views/index.scaml
    */
-  public final JobModelManager jobModelManager;
 
   public final int taskId;
   /**
@@ -132,7 +125,7 @@ public class YarnAppState {
   //TODO: Make the below 3 variables immutable. Tracked as a part of SAMZA-902. Save for later.
   /**
    * Job Coordinator URL
-   * Usage in {@link org.apache.samza.job.yarn.SamzaAppMasterService} &amp; YarnContainerRunner
+   * Usage in {@link org.apache.samza.job.yarn.SamzaYarnAppMasterService} &amp; YarnContainerRunner
    */
   public URL coordinatorUrl = null;
   /**


[2/3] samza git commit: SAMZA-903: Refactor UI state variables

Posted by ni...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 7778a38..1fac7f4 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.job.yarn;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -40,6 +42,7 @@ import org.apache.samza.util.hadoop.HttpFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -90,6 +93,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
    */
   private final SamzaYarnAppMasterService service;
 
+  private final YarnConfig yarnConfig;
 
   /**
    * State variables to map Yarn specific callbacks into Samza specific callbacks.
@@ -126,18 +130,19 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     int nodePort = Integer.parseInt(nodePortString);
     int nodeHttpPort = Integer.parseInt(nodeHttpPortString);
     YarnConfig yarnConfig = new YarnConfig(config);
+    this.yarnConfig = yarnConfig;
     int interval = yarnConfig.getAMPollIntervalMs();
 
     //Instantiate the AM Client.
     this.amClient = AMRMClientAsync.createAMRMClientAsync(interval, this);
 
-    this.state = new YarnAppState(jobModelManager, -1, containerId, nodeHostString, nodePort, nodeHttpPort, samzaAppState);
+    this.state = new YarnAppState(-1, containerId, nodeHostString, nodePort, nodeHttpPort);
 
     log.info("Initialized YarnAppState: {}", state.toString());
-    this.service = new SamzaYarnAppMasterService(config, this.state, registry);
+    this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, hConfig);
 
     log.info("ContainerID str {}, Nodehost  {} , Nodeport  {} , NodeHttpport {}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort});
-    this.lifecycle = new SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), yarnConfig.getContainerMaxCpuCores(), state, amClient );
+    this.lifecycle = new SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), yarnConfig.getContainerMaxCpuCores(), samzaAppState, state, amClient );
 
     yarnContainerRunner = new YarnContainerRunner(config, hConfig);
   }
@@ -312,6 +317,33 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     amClient.stop();
     log.info("Stopping the AM service " );
     service.onShutdown();
+
+    if(status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
+      cleanupStagingDir();
+    }
+  }
+
+  /**
+   * Cleans up the staging directory of the job. All exceptions during the cleanup
+   * are swallowed.
+   */
+  private void cleanupStagingDir() {
+    String yarnJobStagingDirectory = yarnConfig.getYarnJobStagingDirectory();
+    if(yarnJobStagingDirectory != null) {
+      JobContext context = new JobContext();
+      context.setAppStagingDir(new Path(yarnJobStagingDirectory));
+
+      FileSystem fs = null;
+      try {
+        fs = FileSystem.get(hConfig);
+      } catch (IOException e) {
+        log.error("Unable to clean up file system: {}", e);
+        return;
+      }
+      if(fs != null) {
+        YarnJobUtil.cleanupStagingDir(context, fs);
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index 93660c7..93176ff 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -15,7 +15,8 @@
   KIND, either express or implied.  See the License for the
   specific language governing permissions and limitations
   under the License.
--@ val state: org.apache.samza.job.yarn.SamzaAppState
+-@ val state: org.apache.samza.job.yarn.YarnAppState
+-@ val samzaAppState: org.apache.samza.clustermanager.SamzaApplicationState
 -@ val config: scala.collection.immutable.TreeMap[String, String]
 -@ val rmHttpAddress: String
 -@ val jobName: String = config.get("job.name").getOrElse("MISSING JOB NAME")
@@ -91,10 +92,10 @@
               %a(target="_blank" href="http://#{state.nodeHost}:#{state.nodeHttpPort.toString}/node/containerlogs/#{state.amContainerId.toString}/#{username}")= state.amContainerId.toString
           %tr
             %td.key JMX server url
-            %td= state.jmxUrl
+            %td= samzaAppState.jmxUrl
           %tr
             %td.key JMX server tunneling url
-            %td= state.jmxTunnelingUrl
+            %td= samzaAppState.jmxTunnelingUrl
 
     %div.tab-pane#containers
       %h2 Containers
@@ -102,16 +103,16 @@
         %tr
           %tr
             %td.key Completed
-            %td= state.completedContainers.toString
+            %td= samzaAppState.completedContainers.toString
           %tr
             %td.key Needed
-            %td= state.neededContainers.toString
+            %td= samzaAppState.neededContainers.toString
           %tr
             %td.key Failed
-            %td= state.failedContainers.toString
+            %td= samzaAppState.failedContainers.toString
           %tr
             %td.key Released
-            %td= state.releasedContainers.toString
+            %td= samzaAppState.releasedContainers.toString
 
       %h2 Running Containers
       %table.table.table-striped.table-bordered.tablesorter#containers-table
@@ -124,7 +125,7 @@
             %th Up Time
             %th JMX access
         %tbody
-          - for((containerId, container) <- state.runningContainers)
+          - for((containerId, container) <- state.runningYarnContainers)
             %tr
               %td #{containerId.toString}
               %td
@@ -136,8 +137,8 @@
               %td
                 Up time: #{container.upTimeStr()}
               %td
-                Ordinary: #{state.jobCoordinator.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_URL_KEY)}
-                Tunneling: #{state.jobCoordinator.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_TUNNELING_URL_KEY)}
+                Ordinary: #{samzaAppState.jobModelManager.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_URL_KEY)}
+                Tunneling: #{samzaAppState.jobModelManager.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_TUNNELING_URL_KEY)}
 
       %h2 Failed Containers
       %table.table.table-striped.table-bordered.tablesorter#containers-table
@@ -162,10 +163,10 @@
         %tbody
           %tr
             %td.key Total
-            %td= state.containerCount.toString
+            %td= samzaAppState.containerCount.toString
           %tr
             %td.key Finished
-            %td= state.finishedContainers.size.toString
+            %td= samzaAppState.finishedContainers.toString
 
       %h3 TaskName Assignment
       %table.table.table-striped.table-bordered.tablesorter#taskids-table
@@ -176,8 +177,8 @@
             %th SystemStreamPartitions
             %th Container
         %tbody
-          - for((containerId, container) <- state.runningContainers)
-            - val containerModel = state.jobCoordinator.jobModel.getContainers.get(containerId)
+          - for((containerId, container) <- state.runningYarnContainers)
+            - val containerModel = samzaAppState.jobModelManager.jobModel.getContainers.get(containerId)
             - for((taskName, taskModel) <- containerModel.getTasks)
               %tr
                 %td= containerId

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
deleted file mode 100644
index 1fb18be..0000000
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import java.io.IOException
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, FileSystem}
-
-import scala.collection.JavaConversions.asScalaBuffer
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, Container, ContainerStatus, NodeReport}
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.Config
-import org.apache.samza.config.ShellCommandConfig
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.YarnConfig
-import org.apache.samza.metrics.JmxServer
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.util.hadoop.HttpFileSystem
-import org.apache.samza.util.Logging
-import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.SamzaException
-
-/**
- * When YARN executes an application master, it needs a bash command to
- * execute. For Samza, YARN will execute this main method when starting Samza's
- * YARN application master.
- *
- * <br/><br/>
- *
- * The main method gets all of the environment variables (passed by Samza's
- * YARN client, and YARN itself), and wires up everything to run Samza's
- * application master.
- */
-object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler {
-  val DEFAULT_POLL_INTERVAL_MS: Int = 1000
-  var listeners: List[YarnAppMasterListener] = null
-  var storedException: Throwable = null
-
-  def main(args: Array[String]) {
-    putMDC("containerName", "samza-application-master")
-    val containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString)
-    info("got container id: %s" format containerIdStr)
-    val containerId = ConverterUtils.toContainerId(containerIdStr)
-    val applicationAttemptId = containerId.getApplicationAttemptId
-    info("got app attempt id: %s" format applicationAttemptId)
-    val nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString)
-    info("got node manager host: %s" format nodeHostString)
-    val nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString)
-    info("got node manager port: %s" format nodePortString)
-    val nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString)
-    info("got node manager http port: %s" format nodeHttpPortString)
-    val coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG), classOf[Config]))
-    info("got coordinator system config: %s" format coordinatorSystemConfig)
-    val registry = new MetricsRegistryMap
-    val jobCoordinator = JobModelManager(coordinatorSystemConfig, registry)
-    val config = jobCoordinator.jobModel.getConfig
-    val yarnConfig = new YarnConfig(config)
-    info("got config: %s" format coordinatorSystemConfig)
-    putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can not find the job name")))
-    putMDC("jobId", config.getJobId.getOrElse("1"))
-    val hConfig = new YarnConfiguration
-    hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)
-    val interval = yarnConfig.getAMPollIntervalMs
-    val amClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](interval, this)
-    val clientHelper = new ClientHelper(hConfig)
-    val containerMem = yarnConfig.getContainerMaxMemoryMb
-    val containerCpu = yarnConfig.getContainerMaxCpuCores
-    val jmxServer = if (yarnConfig.getJmxServerEnabled) Some(new JmxServer()) else None
-    val jobContext = new JobContext
-    Option(yarnConfig.getYarnJobStagingDirectory).map {
-      jobStagingDirectory => jobContext.setAppStagingDir(new Path(jobStagingDirectory))
-    }
-
-    // wire up all of the yarn event listeners
-    val state = new SamzaAppState(jobCoordinator, -1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
-    try {
-      if (jmxServer.isDefined) {
-        state.jmxUrl = jmxServer.get.getJmxUrl
-        state.jmxTunnelingUrl = jmxServer.get.getTunnelingJmxUrl
-      }
-
-      val service = new SamzaAppMasterService(config, state, registry, clientHelper, hConfig)
-      val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient)
-      val metrics = new SamzaAppMasterMetrics(config, state, registry)
-      val taskManager = new SamzaTaskManager(config, state, amClient, hConfig)
-
-      listeners =  List(service, lifecycle, metrics, taskManager)
-      run(amClient, listeners, hConfig, interval)
-    } finally {
-      if (state.status != FinalApplicationStatus.UNDEFINED) {
-        YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(hConfig))
-      }
-
-      // jmxServer has to be stopped or will prevent process from exiting.
-      if (jmxServer.isDefined) {
-        jmxServer.get.stop
-      }
-    }
-  }
-
-  def run(amClient: AMRMClientAsync[ContainerRequest], listeners: List[YarnAppMasterListener], hConfig: YarnConfiguration, interval: Int): Unit = {
-    try {
-      amClient.init(hConfig)
-      amClient.start
-      listeners.foreach(_.onInit)
-      var isShutdown: Boolean = false
-      // have the loop to prevent the process from exiting until the job is to shutdown or error occurs on amClient
-      while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _) && storedException == null) {
-        try {
-          Thread.sleep(interval)
-        } catch {
-          case e: InterruptedException => {
-            isShutdown = true
-            info("got interrupt in app master thread, so shutting down")
-          }
-        }
-      }
-    } finally {
-      // listeners has to be stopped
-      listeners.foreach(listener => try {
-        listener.onShutdown
-      } catch {
-        case e: Exception => warn("Listener %s failed to shutdown." format listener, e)
-      })
-
-      // amClient has to be stopped
-      amClient.stop
-    }
-  }
-
-  override def onContainersCompleted(statuses: java.util.List[ContainerStatus]): Unit =
-    statuses.foreach(containerStatus => listeners.foreach(_.onContainerCompleted(containerStatus)))
-
-  override def onContainersAllocated(containers: java.util.List[Container]): Unit =
-    containers.foreach(container => listeners.foreach(_.onContainerAllocated(container)))
-
-  override def onShutdownRequest: Unit = listeners.foreach(_.onReboot)
-
-  override def onNodesUpdated(updatedNodes: java.util.List[NodeReport]): Unit = Unit
-
-  // TODO need to think about meaningful SAMZA's progress
-  override def getProgress: Float = 0.0F
-
-  override def onError(e: Throwable): Unit = {
-    error("Error occured in amClient's callback", e)
-    storedException = e
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
deleted file mode 100644
index 2a5c0d8..0000000
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
-import org.apache.samza.SamzaException
-import org.apache.samza.util.Logging
-
-/**
- * Responsible for managing the lifecycle of the application master. Mostly,
- * this means registering and unregistering with the RM, and shutting down
- * when the RM tells us to Reboot.
- */
-class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppState, amClient: AMRMClientAsync[ContainerRequest]) extends YarnAppMasterListener with Logging {
-  var validResourceRequest = true
-  var shutdownMessage: String = null
-
-  override def onInit() {
-    val host = state.nodeHost
-    val response = amClient.registerApplicationMaster(host, state.rpcUrl.getPort, "%s:%d" format (host, state.trackingUrl.getPort))
-
-    // validate that the YARN cluster can handle our container resource requirements
-    val maxCapability = response.getMaximumResourceCapability
-    val maxMem = maxCapability.getMemory
-    val maxCpu = maxCapability.getVirtualCores
-
-    info("Got AM register response. The YARN RM supports container requests with max-mem: %s, max-cpu: %s" format (maxMem, maxCpu))
-
-    if (containerMem > maxMem || containerCpu > maxCpu) {
-      shutdownMessage = "The YARN cluster is unable to run your job due to unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." format (containerMem, containerCpu)
-      error(shutdownMessage)
-      validResourceRequest = false
-      state.status = FinalApplicationStatus.FAILED
-      state.jobHealthy.set(false)
-    }
-  }
-
-  override def onReboot() {
-    throw new SamzaException("Received a reboot signal from the RM, so throwing an exception to reboot the AM.")
-  }
-
-  override def onShutdown() {
-    info("Shutting down.")
-    amClient.unregisterApplicationMaster(state.status, shutdownMessage, null)
-  }
-
-  override def shouldShutdown = !validResourceRequest
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
deleted file mode 100644
index 054d8b6..0000000
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.JvmMetrics
-import org.apache.samza.config.Config
-import org.apache.samza.task.TaskContext
-import org.apache.samza.Partition
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.util.Util
-import org.apache.samza.metrics.ReadableMetricsRegistry
-import org.apache.samza.util.Logging
-import org.apache.samza.SamzaException
-import java.util.Timer
-import java.util.TimerTask
-import org.apache.samza.metrics.MetricsHelper
-
-object SamzaAppMasterMetrics {
-  val sourceName = "ApplicationMaster"
-}
-
-/**
- * Responsible for wiring up Samza's metrics. Given that Samza has a metric
- * registry, we might as well use it. This class takes Samza's application
- * master state, and converts it to metrics.
- */
-class SamzaAppMasterMetrics(
-  val config: Config,
-  val state: SamzaAppState,
-  val registry: ReadableMetricsRegistry) extends MetricsHelper with YarnAppMasterListener with Logging {
-
-  val jvm = new JvmMetrics(registry)
-  val reporters = config.getMetricReporterNames.map(reporterName => {
-    val metricsFactoryClassName = config
-      .getMetricsFactoryClass(reporterName)
-      .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName))
-
-    val reporter =
-      Util
-        .getObj[MetricsReporterFactory](metricsFactoryClassName)
-        .getMetricsReporter(reporterName, SamzaAppMasterMetrics.sourceName, config)
-
-    reporter.register(SamzaAppMasterMetrics.sourceName, registry)
-    (reporterName, reporter)
-  }).toMap
-
-  override def onInit() {
-    val mRunningContainers = newGauge("running-containers", () => state.runningContainers.size)
-    val mNeededContainers = newGauge("needed-containers", () => state.neededContainers.get())
-    val mCompletedContainers = newGauge("completed-containers", () => state.completedContainers.get())
-    val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get())
-    val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get())
-    val mContainers = newGauge("container-count", () => state.containerCount)
-    val mHost = newGauge("http-host", () => state.nodeHost)
-    val mTrackingPort = newGauge("http-port", () => state.trackingUrl.getPort)
-    val mRpcPort = newGauge("rpc-port", () => state.rpcUrl.getPort)
-    val mAppAttemptId = newGauge("app-attempt-id", () => state.appAttemptId.toString)
-    val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0)
-    val mLocalityMatchedRequests = newGauge(
-      "locality-matched",
-      () => {
-        if (state.containerRequests.get() != 0) {
-          state.matchedContainerRequests.get() / state.containerRequests.get()
-        } else {
-          0L
-        }
-      })
-
-    jvm.start
-    reporters.values.foreach(_.start)
-  }
-
-  override def onShutdown() {
-    reporters.values.foreach(_.stop)
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
deleted file mode 100644
index 979d81d..0000000
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
-import org.apache.samza.coordinator.stream.messages.SetConfig
-import org.apache.samza.util.Logging
-import org.apache.samza.config.Config
-import org.apache.samza.metrics.ReadableMetricsRegistry
-import org.apache.samza.SamzaException
-import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.webapp.ApplicationMasterRestServlet
-import org.apache.samza.webapp.ApplicationMasterWebServlet
-
-/**
-  * Samza's application master runs a very basic HTTP/JSON service to allow
-  * dashboards to check on the status of a job. SamzaAppMasterService starts
-  * up the web service when initialized.
-  * <p />
-  * Besides the HTTP/JSON service endpoints, it also starts an optional
-  * SecurityManager which takes care of the security needs when running in
-  * a secure environment.
-  */
-class SamzaAppMasterService(config: Config, state: SamzaAppState, registry: ReadableMetricsRegistry, clientHelper: ClientHelper, yarnConfiguration: YarnConfiguration) extends YarnAppMasterListener with Logging {
-  var rpcApp: HttpServer = null
-  var webApp: HttpServer = null
-  val SERVER_URL_OPT: String = "samza.autoscaling.server.url"
-  var securityManager: Option[SamzaAppMasterSecurityManager] = None
-
-  override def onInit() {
-    // try starting the samza AM dashboard at a random rpc and tracking port
-    info("Starting webapp at a random rpc and tracking port")
-
-    rpcApp = new HttpServer(resourceBasePath = "scalate")
-    rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry))
-    rpcApp.start
-
-    webApp = new HttpServer(resourceBasePath = "scalate")
-    webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
-    webApp.start
-
-    state.jobCoordinator.start
-    state.rpcUrl = rpcApp.getUrl
-    state.trackingUrl = webApp.getUrl
-    state.coordinatorUrl = state.jobCoordinator.server.getUrl
-
-    //write server url to coordinator stream
-    val coordinatorStreamWriter: CoordinatorStreamWriter = new CoordinatorStreamWriter(config)
-    coordinatorStreamWriter.start()
-    coordinatorStreamWriter.sendMessage(SetConfig.TYPE, SERVER_URL_OPT, state.coordinatorUrl.toString)
-    coordinatorStreamWriter.stop()
-    debug("sent server url message with value: %s " format state.coordinatorUrl.toString)
-
-    info("Webapp is started at (rpc %s, tracking %s, coordinator %s)" format(state.rpcUrl, state.trackingUrl, state.coordinatorUrl))
-
-    // start YarnSecurityManger for a secure cluster
-    if (UserGroupInformation.isSecurityEnabled) {
-      securityManager = Option {
-        val securityManager = new SamzaAppMasterSecurityManager(config, yarnConfiguration)
-        securityManager.start
-        securityManager
-      }
-    }
-  }
-
-  override def onShutdown() {
-    if (rpcApp != null) {
-      rpcApp.stop
-    }
-
-    if (webApp != null) {
-      webApp.stop
-    }
-
-    state.jobCoordinator.stop
-
-    securityManager.map {
-      securityManager => securityManager.stop
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
index 2ed9baf..c9c1e18 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
@@ -33,7 +33,7 @@ import org.apache.samza.util.Logging
  * when the RM tells us to Reboot.
  */
 //This class is used in the refactored code path as called by run-jc.sh
-class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: YarnAppState, amClient: AMRMClientAsync[ContainerRequest]) extends Logging {
+class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaAppState: SamzaApplicationState, state: YarnAppState, amClient: AMRMClientAsync[ContainerRequest]) extends Logging {
   var validResourceRequest = true
   var shutdownMessage: String = null
   var webApp: SamzaYarnAppMasterService = null
@@ -51,8 +51,8 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: Y
       shutdownMessage = "The YARN cluster is unable to run your job due to unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." format (containerMem, containerCpu)
       error(shutdownMessage)
       validResourceRequest = false
-      state.samzaAppState.status = SamzaAppStatus.FAILED;
-      state.samzaAppState.jobHealthy.set(false)
+      samzaAppState.status = SamzaAppStatus.FAILED;
+      samzaAppState.jobHealthy.set(false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
index f62bec1..5f2bfc5 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
@@ -19,42 +19,47 @@
 
 package org.apache.samza.job.yarn
 
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.samza.clustermanager.SamzaApplicationState
 import org.apache.samza.config.Config
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
 import org.apache.samza.coordinator.stream.messages.SetConfig
 import org.apache.samza.metrics.ReadableMetricsRegistry
 import org.apache.samza.util.Logging
+import org.apache.samza.webapp.{ApplicationMasterWebServlet, ApplicationMasterRestServlet}
 
 /**
- * Samza's application master runs a very basic HTTP/JSON service to allow
- * dashboards to check on the status of a job. SamzaAppMasterService starts
- * up the web service when initialized.
- */
+  * Samza's application master runs a very basic HTTP/JSON service to allow
+  * dashboards to check on the status of a job. SamzaAppMasterService starts
+  * up the web service when initialized.
+  */
 //This class is used in the refactored code path as called by run-jc.sh
 
-class SamzaYarnAppMasterService(config: Config, state: YarnAppState, registry: ReadableMetricsRegistry) extends  Logging {
+class SamzaYarnAppMasterService(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry, yarnConfiguration: YarnConfiguration) extends  Logging {
   var rpcApp: HttpServer = null
   var webApp: HttpServer = null
   val SERVER_URL_OPT: String = "samza.autoscaling.server.url"
+  var securityManager: Option[SamzaAppMasterSecurityManager] = None
 
-   def onInit() {
+  def onInit() {
     // try starting the samza AM dashboard at a random rpc and tracking port
     info("Starting webapp at a random rpc and tracking port")
 
     rpcApp = new HttpServer(resourceBasePath = "scalate")
-    //TODO: Since the state has changed into Samza specific and Yarn specific states, this UI has to be refactored too.
-    //rpcApp.addServlet("/*", refactor ApplicationMasterRestServlet(config, state, registry))
+
+    rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, samzaAppState, state, registry))
     rpcApp.start
 
     webApp = new HttpServer(resourceBasePath = "scalate")
-    //webApp.addServlet("/*", refactor ApplicationMasterWebServlet(config, state))
+    webApp.addServlet("/*", new ApplicationMasterWebServlet(config, samzaAppState, state))
     webApp.start
 
-    state.jobModelManager.start
+    samzaAppState.jobModelManager.start
     state.rpcUrl = rpcApp.getUrl
     state.trackingUrl = webApp.getUrl
-    state.coordinatorUrl = state.jobModelManager.server.getUrl
+    state.coordinatorUrl = samzaAppState.jobModelManager.server.getUrl
 
     //write server url to coordinator stream
     val coordinatorStreamWriter: CoordinatorStreamWriter = new CoordinatorStreamWriter(config)
@@ -64,9 +69,19 @@ class SamzaYarnAppMasterService(config: Config, state: YarnAppState, registry: R
     debug("Sent server url message with value: %s " format state.coordinatorUrl.toString)
 
     info("Webapp is started at (rpc %s, tracking %s, coordinator %s)" format(state.rpcUrl, state.trackingUrl, state.coordinatorUrl))
+
+    // start YarnSecurityManger for a secure cluster
+    if (UserGroupInformation.isSecurityEnabled) {
+      securityManager = Option {
+        val securityManager = new SamzaAppMasterSecurityManager(config, yarnConfiguration)
+        securityManager.start
+        securityManager
+      }
+    }
+
   }
 
-   def onShutdown() {
+  def onShutdown() {
     if (rpcApp != null) {
       rpcApp.stop
     }
@@ -75,6 +90,11 @@ class SamzaYarnAppMasterService(config: Config, state: YarnAppState, registry: R
       webApp.stop
     }
 
-    state.jobModelManager.stop
+    samzaAppState.jobModelManager.stop
+
+    securityManager.map {
+      securityManager => securityManager.stop
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 3ca1f0d..46dc4d1 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -86,11 +86,11 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
     }
     logger.info("Inside YarnJob: fwk_path is %s, ver is %s use it directly " format(fwkPath, fwkVersion))
 
-    var cmdExec = "./__package/bin/run-am.sh" // default location
+    var cmdExec = "./__package/bin/run-jc.sh" // default location
 
     if (!fwkPath.isEmpty()) {
       // if we have framework installed as a separate package - use it
-      cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-am.sh"
+      cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-jc.sh"
 
       logger.info("Using FWK path: " + "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s".
              format(ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, cmdExec,

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index a40ab72..cdd389c 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -19,17 +19,18 @@
 
 package org.apache.samza.webapp
 
+import org.apache.samza.clustermanager.SamzaApplicationState
 import org.scalatra._
 import scalate.ScalateSupport
 import org.apache.samza.config.Config
-import org.apache.samza.job.yarn.{SamzaAppState, ClientHelper}
+import org.apache.samza.job.yarn.{YarnAppState, ClientHelper}
 import org.apache.samza.metrics._
 import scala.collection.JavaConversions._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import java.util.HashMap
 import org.apache.samza.serializers.model.SamzaObjectMapper
 
-class ApplicationMasterRestServlet(config: Config, state: SamzaAppState, registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport {
+class ApplicationMasterRestServlet(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport {
   val yarnConfig = new YarnConfiguration
   val client = new ClientHelper(yarnConfig)
   val jsonMapper = SamzaObjectMapper.getObjectMapper
@@ -78,11 +79,11 @@ class ApplicationMasterRestServlet(config: Config, state: SamzaAppState, registr
   get("/am") {
     val containers = new HashMap[String, HashMap[String, Object]]
 
-    state.runningContainers.foreach {
+    state.runningYarnContainers.foreach {
       case (containerId, container) =>
         val yarnContainerId = container.id.toString
         val containerMap = new HashMap[String, Object]
-        val taskModels = state.jobCoordinator.jobModel.getContainers.get(containerId).getTasks
+        val taskModels = samzaAppState.jobModelManager.jobModel.getContainers.get(containerId).getTasks
         containerMap.put("yarn-address", container.nodeHttpAddress)
         containerMap.put("start-time", container.startTime.toString)
         containerMap.put("up-time", container.upTime.toString)

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
index 605332a..a32cd65 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
@@ -19,16 +19,17 @@
 
 package org.apache.samza.webapp
 
+import org.apache.samza.clustermanager.SamzaApplicationState
 import org.scalatra._
 import scalate.ScalateSupport
-import org.apache.samza.job.yarn.{SamzaAppState}
+import org.apache.samza.job.yarn.YarnAppState
 import org.apache.samza.config.Config
 import scala.collection.JavaConversions._
 import scala.collection.immutable.TreeMap
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
-class ApplicationMasterWebServlet(config: Config, state: SamzaAppState) extends ScalatraServlet with ScalateSupport {
+class ApplicationMasterWebServlet(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState) extends ScalatraServlet with ScalateSupport {
   val yarnConfig = new YarnConfiguration
 
   before() {
@@ -39,6 +40,7 @@ class ApplicationMasterWebServlet(config: Config, state: SamzaAppState) extends
     layoutTemplate("/WEB-INF/views/index.scaml",
       "config" -> TreeMap(config.sanitize.toMap.toArray: _*),
       "state" -> state,
+      "samzaAppState" -> samzaAppState,
       "rmHttpAddress" -> WebAppUtils.getRMWebAppURLWithScheme(yarnConfig))
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
deleted file mode 100644
index e21aded..0000000
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.job.yarn.util.MockContainerRequestState;
-import org.apache.samza.job.yarn.util.TestUtil;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class TestContainerAllocator extends TestContainerAllocatorCommon {
-
-  private final Config config = new MapConfig(new HashMap<String, String>() {
-    {
-      put("yarn.container.count", "1");
-      put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
-      put("yarn.container.memory.mb", "512");
-      put("yarn.package.path", "/foo");
-      put("task.inputs", "test-system.test-stream");
-      put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
-      put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
-      put("yarn.container.retry.count", "1");
-      put("yarn.container.retry.window.ms", "1999999999");
-      put("yarn.container.request.timeout.ms", "3");
-      put("yarn.allocator.sleep.ms", "1");
-    }
-  });
-
-  @Override
-  protected Config getConfig() {
-    return config;
-  }
-
-  @Override
-  protected MockContainerRequestState createContainerRequestState(
-      AMRMClientAsync<AMRMClient.ContainerRequest> amClient) {
-    return new MockContainerRequestState(amClient, false);
-  }
-
-  /**
-   * Test request containers with no containerToHostMapping makes the right number of requests
-   */
-  @Test
-  public void testRequestContainersWithNoMapping() throws Exception {
-    int containerCount = 4;
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>();
-    for (int i = 0; i < containerCount; i++) {
-      containersToHostMapping.put(i, null);
-    }
-    allocatorThread.start();
-
-    containerAllocator.requestContainers(containersToHostMapping);
-
-    assertNotNull(requestState);
-
-    assertNotNull(requestState.getRequestsQueue());
-    assertTrue(requestState.getRequestsQueue().size() == 4);
-
-    // If host-affinty is not enabled, it doesn't update the requestMap
-    assertNotNull(requestState.getRequestsToCountMap());
-    assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0);
-  }
-  /**
-   * Adds all containers returned to ANY_HOST only
-   */
-  @Test
-  public void testAddContainer() throws Exception {
-    assertNull(requestState.getContainersOnAHost("host1"));
-    assertNull(requestState.getContainersOnAHost(ANY_HOST));
-
-    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123));
-    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host3", 123));
-
-    assertNull(requestState.getContainersOnAHost("host1"));
-    assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
-    assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2);
-  }
-
-  /**
-   * Test requestContainers
-   */
-  @Test
-  public void testRequestContainers() throws Exception {
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
-      {
-        put(0, "host1");
-        put(1, "host2");
-        put(2, null);
-        put(3, "host1");
-      }
-    };
-
-    allocatorThread.start();
-
-    containerAllocator.requestContainers(containersToHostMapping);
-
-    assertNotNull(testAMRMClient.requests);
-    assertEquals(4, testAMRMClient.requests.size());
-
-    assertNotNull(requestState);
-
-    assertNotNull(requestState.getRequestsQueue());
-    assertTrue(requestState.getRequestsQueue().size() == 4);
-
-    // If host-affinty is not enabled, it doesn't update the requestMap
-    assertNotNull(requestState.getRequestsToCountMap());
-    assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
deleted file mode 100644
index 0bbd48d..0000000
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.job.yarn.util.MockContainerListener;
-import org.apache.samza.job.yarn.util.MockContainerRequestState;
-import org.apache.samza.job.yarn.util.MockContainerUtil;
-import org.apache.samza.job.yarn.util.MockHttpServer;
-import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
-import org.apache.samza.job.yarn.util.TestUtil;
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * Handles all common fields/tests for ContainerAllocators.
- */
-public abstract class TestContainerAllocatorCommon {
-  protected static final String ANY_HOST = ContainerRequestState.ANY_HOST;
-
-  protected final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
-
-  protected AMRMClientAsyncImpl amRmClientAsync;
-  protected TestAMRMClientImpl testAMRMClient;
-  protected MockContainerRequestState requestState;
-  protected AbstractContainerAllocator containerAllocator;
-  protected Thread allocatorThread;
-  protected ContainerUtil containerUtil;
-
-  protected SamzaAppState state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2);
-
-  protected abstract Config getConfig();
-  protected abstract MockContainerRequestState createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient);
-
-  private JobModelManager getCoordinator(int containerCount) {
-    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
-    for (int i = 0; i < containerCount; i++) {
-      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
-      containers.put(i, container);
-    }
-    JobModel jobModel = new JobModel(getConfig(), containers);
-    return new JobModelManager(jobModel, server, null);
-  }
-
-
-  @Before
-  public void setup() throws Exception {
-    // Create AMRMClient
-    testAMRMClient = new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-            false,
-            new ArrayList<Container>(),
-            new ArrayList<ContainerStatus>()
-        ));
-    amRmClientAsync = TestUtil.getAMClient(testAMRMClient);
-
-    // Initialize certain state variables
-    state.coordinatorUrl = new URL("http://localhost:7778/");
-
-    containerUtil = TestUtil.getContainerUtil(getConfig(), state);
-
-    requestState = createContainerRequestState(amRmClientAsync);
-    containerAllocator = new HostAwareContainerAllocator(
-        amRmClientAsync,
-        containerUtil,
-        new YarnConfig(getConfig())
-    );
-    Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState");
-    requestStateField.setAccessible(true);
-    requestStateField.set(containerAllocator, requestState);
-
-    allocatorThread = new Thread(containerAllocator);
-  }
-
-  @After
-  public void teardown() throws Exception {
-    containerAllocator.setIsRunning(false);
-    allocatorThread.join();
-  }
-
-  /**
-   * If the container fails to start e.g because it fails to connect to a NM on a host that
-   * is down, the allocator should request a new container on a different host.
-   */
-  @Test
-  public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
-    final Container container = TestUtil
-        .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "host2", 123);
-    final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123);
-
-    ((MockContainerUtil) containerUtil).containerStartException = new IOException("You shall not... connect to the NM!");
-
-    Runnable releasedContainerAssertions = new Runnable() {
-      @Override
-      public void run() {
-        // The failed container should be released. The successful one should not.
-        assertNotNull(testAMRMClient.getRelease());
-        assertEquals(1, testAMRMClient.getRelease().size());
-        assertTrue(testAMRMClient.getRelease().contains(container.getId()));
-      }
-    };
-
-    Runnable assignedContainerAssertions = new Runnable() {
-      @Override
-      public void run() {
-        // Test that the first request assignment had a preferred host and the retry didn't
-        assertEquals(2, requestState.assignedRequests.size());
-
-        SamzaContainerRequest request = requestState.assignedRequests.remove();
-        assertEquals(0, request.expectedContainerId);
-        assertEquals("host2", request.getPreferredHost());
-
-        request = requestState.assignedRequests.remove();
-        assertEquals(0, request.expectedContainerId);
-        assertEquals("ANY_HOST", request.getPreferredHost());
-
-        // This routine should be called after the retry is assigned, but before it's started.
-        // So there should still be 1 container needed because neededContainers should not be decremented for a failed start.
-        assertEquals(1, state.neededContainers.get());
-      }
-    };
-
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, null, releasedContainerAssertions, assignedContainerAssertions, null);
-    requestState.registerContainerListener(listener);
-    state.neededContainers.set(1); // Normally this would be done in the SamzaTaskManager
-
-    // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)
-    containerAllocator.requestContainer(0, "host2");
-    containerAllocator.addContainer(container);
-    containerAllocator.addContainer(container1);
-
-    allocatorThread.start();
-
-    listener.verify();
-  }
-
-
-  /**
-   * Extra allocated containers that are returned by the RM and unused by the AM should be released.
-   * Containers are considered "extra" only when there are no more pending requests to fulfill
-   * @throws Exception
-   */
-  @Test
-  public void testAllocatorReleasesExtraContainers() throws Exception {
-    final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "host1", 123);
-    final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123);
-    final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host2", 123);
-
-    Runnable releasedContainerAssertions = new Runnable() {
-      @Override
-      public void run() {
-        assertNotNull(testAMRMClient.getRelease());
-        assertEquals(2, testAMRMClient.getRelease().size());
-        assertTrue(testAMRMClient.getRelease().contains(container1.getId()));
-        assertTrue(testAMRMClient.getRelease().contains(container2.getId()));
-
-        // Test that state is cleaned up
-        assertEquals(0, requestState.getRequestsQueue().size());
-        assertEquals(0, requestState.getRequestsToCountMap().size());
-        assertNull(requestState.getContainersOnAHost("host1"));
-        assertNull(requestState.getContainersOnAHost("host2"));
-      }
-    };
-
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(3, 2, 0, 0, null, releasedContainerAssertions, null, null);
-    requestState.registerContainerListener(listener);
-
-    containerAllocator.requestContainer(0, "host1");
-
-    containerAllocator.addContainer(container);
-    containerAllocator.addContainer(container1);
-    containerAllocator.addContainer(container2);
-
-    allocatorThread.start();
-
-    listener.verify();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java
deleted file mode 100644
index 402fe78..0000000
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.job.yarn.util.TestAMRMClientImpl;
-import org.apache.samza.job.yarn.util.TestUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-import static org.junit.Assert.*;
-
-public class TestContainerRequestState {
-  private AMRMClientAsyncImpl amRmClientAsync;
-  private TestAMRMClientImpl testAMRMClient;
-  private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
-
-  @Before
-  public void setup() {
-    // Create AMRMClient
-    testAMRMClient = new TestAMRMClientImpl(
-        TestUtil.getAppMasterResponse(
-            false,
-            new ArrayList<Container>(),
-            new ArrayList<ContainerStatus>()
-        ));
-    amRmClientAsync = TestUtil.getAMClient(testAMRMClient);
-  }
-
-  /**
-   * Test state after a request is submitted
-   */
-  @Test
-  public void testUpdateRequestState() {
-    // Host-affinity is enabled
-    ContainerRequestState state = new ContainerRequestState(amRmClientAsync, true);
-    SamzaContainerRequest request = new SamzaContainerRequest(0, "abc");
-    state.updateRequestState(request);
-
-    assertNotNull(testAMRMClient.requests);
-    assertEquals(1, testAMRMClient.requests.size());
-    assertEquals(request.getIssuedRequest(), testAMRMClient.requests.get(0));
-
-    assertNotNull(state.getRequestsQueue());
-    assertTrue(state.getRequestsQueue().size() == 1);
-
-    assertNotNull(state.getRequestsToCountMap());
-    assertNotNull(state.getRequestsToCountMap().get("abc"));
-    assertEquals(1, state.getRequestsToCountMap().get("abc").get());
-
-    assertNotNull(state.getContainersOnAHost("abc"));
-    assertEquals(0, state.getContainersOnAHost("abc").size());
-
-    // Host-affinity is not enabled
-    ContainerRequestState state1 = new ContainerRequestState(amRmClientAsync, false);
-    SamzaContainerRequest request1 = new SamzaContainerRequest(1, null);
-    state1.updateRequestState(request1);
-
-    assertNotNull(testAMRMClient.requests);
-    assertEquals(2, testAMRMClient.requests.size());
-
-    AMRMClient.ContainerRequest expectedContainerRequest = request1.getIssuedRequest();
-    AMRMClient.ContainerRequest actualContainerRequest = testAMRMClient.requests.get(1);
-
-    assertEquals(expectedContainerRequest.getCapability(), actualContainerRequest.getCapability());
-    assertEquals(expectedContainerRequest.getPriority(), actualContainerRequest.getPriority());
-    assertNull(actualContainerRequest.getNodes());
-
-    assertNotNull(state1.getRequestsQueue());
-    assertTrue(state1.getRequestsQueue().size() == 1);
-
-    assertNotNull(state1.getRequestsToCountMap());
-    assertNull(state1.getRequestsToCountMap().get(ANY_HOST));
-
-  }
-
-  /**
-   * Test addContainer() updates the state correctly
-   */
-  @Test
-  public void testAddContainer() {
-    // Add container to ANY_LIST when host-affinity is not enabled
-    ContainerRequestState state = new ContainerRequestState(amRmClientAsync, false);
-    Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123);
-    state.addContainer(container);
-
-    assertNotNull(state.getRequestsQueue());
-    assertNotNull(state.getRequestsToCountMap());
-    assertNotNull(state.getContainersOnAHost(ANY_HOST));
-
-    assertEquals(1, state.getContainersOnAHost(ANY_HOST).size());
-    assertEquals(container, state.getContainersOnAHost(ANY_HOST).get(0));
-
-    // Container Allocated when there is no request in queue
-    ContainerRequestState state1 = new ContainerRequestState(amRmClientAsync, true);
-    Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "zzz", 123);
-    state1.addContainer(container1);
-
-    assertNotNull(state1.getRequestsQueue());
-    assertEquals(0, state1.getRequestsQueue().size());
-
-    assertNull(state1.getContainersOnAHost("zzz"));
-    assertNotNull(state1.getContainersOnAHost(ANY_HOST));
-    assertEquals(1, state1.getContainersOnAHost(ANY_HOST).size());
-    assertEquals(container1, state1.getContainersOnAHost(ANY_HOST).get(0));
-
-    // Container Allocated on a Requested Host
-    state1.updateRequestState(new SamzaContainerRequest(0, "abc"));
-
-    assertNotNull(state1.getRequestsQueue());
-    assertEquals(1, state1.getRequestsQueue().size());
-
-    assertNotNull(state1.getRequestsToCountMap());
-    assertNotNull(state1.getRequestsToCountMap().get("abc"));
-    assertEquals(1, state1.getRequestsToCountMap().get("abc").get());
-
-    state1.addContainer(container);
-
-    assertNotNull(state1.getContainersOnAHost("abc"));
-    assertEquals(1, state1.getContainersOnAHost("abc").size());
-    assertEquals(container, state1.getContainersOnAHost("abc").get(0));
-
-    // Container Allocated on host that was not requested
-    Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), "xyz", 123);
-    state1.addContainer(container2);
-
-    assertNull(state1.getContainersOnAHost("xyz"));
-    assertNotNull(state1.getContainersOnAHost(ANY_HOST));
-    assertEquals(2, state1.getContainersOnAHost(ANY_HOST).size());
-    assertEquals(container2, state1.getContainersOnAHost(ANY_HOST).get(1));
-
-    // Extra containers were allocated on a host that was requested
-    Container container3 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000005"), "abc", 123);
-    state1.addContainer(container3);
-
-    assertEquals(3, state1.getContainersOnAHost(ANY_HOST).size());
-    assertEquals(container3, state1.getContainersOnAHost(ANY_HOST).get(2));
-  }
-
-  /**
-   * Test request state after container is assigned to a host
-   * * Assigned on requested host
-   * * Assigned on any host
-   */
-  @Test
-  public void testContainerAssignment() throws Exception {
-    // Host-affinity enabled
-    ContainerRequestState state = new ContainerRequestState(amRmClientAsync, true);
-    SamzaContainerRequest request = new SamzaContainerRequest(0, "abc");
-    SamzaContainerRequest request1 = new SamzaContainerRequest(0, "def");
-
-    state.updateRequestState(request);
-    state.updateRequestState(request1);
-
-    Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123);
-    Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "zzz", 123);
-    state.addContainer(container);
-    state.addContainer(container1);
-
-    assertEquals(2, state.getRequestsQueue().size());
-    assertEquals(2, state.getRequestsToCountMap().size());
-
-    assertNotNull(state.getContainersOnAHost("abc"));
-    assertEquals(1, state.getContainersOnAHost("abc").size());
-    assertEquals(container, state.getContainersOnAHost("abc").get(0));
-
-    assertNotNull(state.getContainersOnAHost("def"));
-    assertEquals(0, state.getContainersOnAHost("def").size());
-
-    assertNotNull(state.getContainersOnAHost(ANY_HOST));
-    assertEquals(1, state.getContainersOnAHost(ANY_HOST).size());
-    assertEquals(container1, state.getContainersOnAHost(ANY_HOST).get(0));
-
-    // Container assigned on the requested host
-    state.updateStateAfterAssignment(request, "abc", container);
-
-    assertEquals(1, state.getRequestsQueue().size());
-    assertEquals(request1, state.getRequestsQueue().peek());
-
-    assertNotNull(state.getRequestsToCountMap().get("abc"));
-    assertEquals(0, state.getRequestsToCountMap().get("abc").get());
-
-    assertNotNull(state.getContainersOnAHost("abc"));
-    assertEquals(0, state.getContainersOnAHost("abc").size());
-
-    // Container assigned on any host
-    state.updateStateAfterAssignment(request1, ANY_HOST, container1);
-
-    assertEquals(0, state.getRequestsQueue().size());
-
-    assertNotNull(state.getRequestsToCountMap().get("def"));
-    assertEquals(0, state.getRequestsToCountMap().get("def").get());
-
-    assertNotNull(state.getContainersOnAHost(ANY_HOST));
-    assertEquals(0, state.getContainersOnAHost(ANY_HOST).size());
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
deleted file mode 100644
index ead7200..0000000
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.job.yarn.util.MockContainerListener;
-import org.apache.samza.job.yarn.util.MockContainerRequestState;
-import org.apache.samza.job.yarn.util.MockContainerUtil;
-import org.apache.samza.job.yarn.util.TestUtil;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class TestHostAwareContainerAllocator extends TestContainerAllocatorCommon {
-
-  private final Config config = new MapConfig(new HashMap<String, String>() {
-    {
-      put("yarn.container.count", "1");
-      put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
-      put("yarn.container.memory.mb", "512");
-      put("yarn.package.path", "/foo");
-      put("task.inputs", "test-system.test-stream");
-      put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
-      put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
-      put("yarn.container.retry.count", "1");
-      put("yarn.container.retry.window.ms", "1999999999");
-      put("yarn.samza.host-affinity.enabled", "true");
-      put("yarn.container.request.timeout.ms", "3");
-      put("yarn.allocator.sleep.ms", "1");
-    }
-  });
-
-  @Override
-  protected Config getConfig() {
-    return config;
-  }
-
-  @Override
-  protected MockContainerRequestState createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient) {
-    return new MockContainerRequestState(amClient, true);
-  }
-
-  /**
-   * Test request containers with no containerToHostMapping makes the right number of requests
-   */
-  @Test
-  public void testRequestContainersWithNoMapping() throws Exception {
-    int containerCount = 4;
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>();
-    for (int i = 0; i < containerCount; i++) {
-      containersToHostMapping.put(i, null);
-    }
-
-    allocatorThread.start();
-
-    containerAllocator.requestContainers(containersToHostMapping);
-
-    assertNotNull(requestState);
-
-    assertNotNull(requestState.getRequestsQueue());
-    assertEquals(4, requestState.getRequestsQueue().size());
-
-    assertNotNull(requestState.getRequestsToCountMap());
-    assertEquals(1, requestState.getRequestsToCountMap().keySet().size());
-    assertTrue(requestState.getRequestsToCountMap().keySet().contains(ANY_HOST));
-  }
-
-  /**
-   * Add containers to the correct host in the request state
-   */
-  @Test
-  public void testAddContainerWithHostAffinity() throws Exception {
-    containerAllocator.requestContainers(new HashMap<Integer, String>() {
-      {
-        put(0, "host1");
-        put(1, "host3");
-      }
-    });
-
-    assertNotNull(requestState.getContainersOnAHost("host1"));
-    assertEquals(0, requestState.getContainersOnAHost("host1").size());
-
-    assertNotNull(requestState.getContainersOnAHost("host3"));
-    assertEquals(0, requestState.getContainersOnAHost("host3").size());
-
-    assertNull(requestState.getContainersOnAHost(ANY_HOST));
-
-    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123));
-    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), "host2", 123));
-    containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host3", 123));
-
-    assertNotNull(requestState.getContainersOnAHost("host1"));
-    assertEquals(1, requestState.getContainersOnAHost("host1").size());
-
-    assertNotNull(requestState.getContainersOnAHost("host3"));
-    assertEquals(1, requestState.getContainersOnAHost("host3").size());
-
-    assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
-    assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 1);
-    assertEquals(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"),
-        requestState.getContainersOnAHost(ANY_HOST).get(0).getId());
-  }
-
-
-  @Test
-  public void testRequestContainers() throws Exception {
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
-      {
-        put(0, "host1");
-        put(1, "host2");
-        put(2, null);
-        put(3, "host1");
-      }
-    };
-    allocatorThread.start();
-
-    containerAllocator.requestContainers(containersToHostMapping);
-
-    assertNotNull(testAMRMClient.requests);
-    assertEquals(4, testAMRMClient.requests.size());
-
-    assertNotNull(requestState);
-
-    assertNotNull(requestState.getRequestsQueue());
-    assertTrue(requestState.getRequestsQueue().size() == 4);
-
-    assertNotNull(requestState.getRequestsToCountMap());
-    Map<String, AtomicInteger> requestsMap = requestState.getRequestsToCountMap();
-
-    assertNotNull(requestsMap.get("host1"));
-    assertEquals(2, requestsMap.get("host1").get());
-
-    assertNotNull(requestsMap.get("host2"));
-    assertEquals(1, requestsMap.get("host2").get());
-
-    assertNotNull(requestsMap.get(ANY_HOST));
-    assertEquals(1, requestsMap.get(ANY_HOST).get());
-  }
-
-  /**
-   * Handles expired requests correctly and assigns ANY_HOST
-   */
-  @Test
-  public void testExpiredRequestHandling() throws Exception {
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
-      {
-        put(0, "requestedHost1");
-        put(1, "requestedHost2");
-      }
-    };
-    containerAllocator.requestContainers(containersToHostMapping);
-
-    assertNotNull(requestState.getRequestsQueue());
-    assertTrue(requestState.getRequestsQueue().size() == 2);
-
-    assertNotNull(requestState.getRequestsToCountMap());
-    assertNotNull(requestState.getRequestsToCountMap().get("requestedHost1"));
-    assertTrue(requestState.getRequestsToCountMap().get("requestedHost1").get() == 1);
-
-    assertNotNull(requestState.getRequestsToCountMap().get("requestedHost2"));
-    assertTrue(requestState.getRequestsToCountMap().get("requestedHost2").get() == 1);
-
-    final Container container0 = TestUtil
-        .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "availableHost1", 123);
-    final Container container1 = TestUtil
-        .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "availableHost2", 123);
-
-    Runnable addedContainerAssertions = new Runnable() {
-      @Override
-      public void run() {
-        assertNotNull(requestState.getRequestsToCountMap());
-        assertNull(requestState.getContainersOnAHost("availableHost1"));
-        assertNull(requestState.getContainersOnAHost("availableHost2"));
-        assertNotNull(requestState.getContainersOnAHost(ANY_HOST));
-        assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2);
-      }
-    };
-
-    Runnable assignedContainerAssertions = new Runnable() {
-      @Override
-      public void run() {
-        List<Container> anyHostContainers = requestState.getContainersOnAHost(ANY_HOST);
-        assertTrue(anyHostContainers == null || anyHostContainers.isEmpty());
-
-        assertNotNull(requestState.getRequestsQueue());
-        assertTrue(requestState.getRequestsQueue().size() == 0);
-
-        assertNotNull(requestState.getRequestsToCountMap());
-        assertNotNull(requestState.getRequestsToCountMap().get("requestedHost1"));
-        assertNotNull(requestState.getRequestsToCountMap().get("requestedHost2"));
-      }
-    };
-
-    Runnable runningContainerAssertions = new Runnable() {
-      @Override
-      public void run() {
-        MockContainerUtil mockContainerUtil = (MockContainerUtil) containerUtil;
-
-        assertNotNull(mockContainerUtil.runningContainerList.get("availableHost1"));
-        assertTrue(mockContainerUtil.runningContainerList.get("availableHost1").contains(container0));
-
-        assertNotNull(mockContainerUtil.runningContainerList.get("availableHost2"));
-        assertTrue(mockContainerUtil.runningContainerList.get("availableHost2").contains(container1));
-      }
-    };
-    // Set up our final asserts before starting the allocator thread
-    MockContainerListener listener = new MockContainerListener(
-        2, 0, 2, 2,
-        addedContainerAssertions,
-        null,
-        assignedContainerAssertions,
-        runningContainerAssertions);
-    requestState.registerContainerListener(listener);
-    ((MockContainerUtil) containerUtil).registerContainerListener(listener);
-
-    containerAllocator.addContainer(container0);
-    containerAllocator.addContainer(container1);
-
-    // Start after adding containers to avoid a race condition between the allocator thread
-    // using the containers and the assertions after the containers are added.
-    allocatorThread.start();
-
-    listener.verify();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java
deleted file mode 100644
index ad0f4d3..0000000
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class TestSamzaContainerRequest {
-  private static final String ANY_HOST = ContainerRequestState.ANY_HOST;
-
-  @Test
-  public void testPreferredHostIsNeverNull() {
-    SamzaContainerRequest request = new SamzaContainerRequest(0, null);
-
-    assertNotNull(request.getPreferredHost());
-
-    // preferredHost is null, it should automatically default to ANY_HOST
-    assertTrue(request.getPreferredHost().equals(ANY_HOST));
-
-    SamzaContainerRequest request1 = new SamzaContainerRequest(1, "abc");
-    assertNotNull(request1.getPreferredHost());
-    assertTrue(request1.getPreferredHost().equals("abc"));
-  }
-
-  @Test
-  public void testAnyHostIsNotPassedToYarnRequest() {
-    SamzaContainerRequest request = new SamzaContainerRequest(0, null);
-    assertNull(request.getIssuedRequest().getNodes());
-
-    SamzaContainerRequest request1 = new SamzaContainerRequest(1, ANY_HOST);
-    assertNull(request1.getIssuedRequest().getNodes());
-  }
-}