You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/03/20 17:56:03 UTC

[1/2] samza git commit: SAMZA-1154: Tasks endpoint to list the complete details of all tasks related to a job

Repository: samza
Updated Branches:
  refs/heads/master d399d6f3c -> 26280cac2


http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java
new file mode 100644
index 0000000..9194d5a
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/Responses.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import java.util.Collections;
+import javax.ws.rs.core.Response;
+
+/**
+ * This is a helper class that holds the methods that are reusable
+ * across the different samza-rest resource endpoints.
+ */
+public class Responses {
+
+  private Responses() {
+  }
+
+  /**
+   * Constructs a consistent format for error responses.
+   *
+   * @param message the error message to report.
+   * @return        the {@link Response} containing the error message.
+   */
+  public static Response errorResponse(String message) {
+    return Response.serverError().entity(Collections.singletonMap("message", message)).build();
+  }
+
+  /**
+   * Constructs a bad request (HTTP 400) response.
+   *
+   * @param message the bad request message to report.
+   * @return        the {@link Response} containing the message.
+   */
+  public static Response badRequestResponse(String message) {
+    return Response.status(Response.Status.BAD_REQUEST).entity(Collections.singletonMap("message", message)).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java
new file mode 100644
index 0000000..301c202
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import com.google.common.base.Preconditions;
+import javax.inject.Singleton;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.rest.model.Task;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.rest.proxy.task.TaskProxyFactory;
+import org.apache.samza.rest.proxy.task.SamzaTaskProxy;
+import org.apache.samza.rest.proxy.task.TaskProxy;
+import org.apache.samza.rest.proxy.task.TaskResourceConfig;
+import org.apache.samza.util.ClassLoaderHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The REST resource for tasks. Handles the requests that are at the tasks scope.
+ */
+@Singleton
+@Path("/v1/jobs")
+public class TasksResource {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TasksResource.class);
+
+  private final TaskProxy taskProxy;
+
+  /**
+   * Initializes a TaskResource with {@link TaskProxy} from the
+   * {@link TaskProxyFactory} class specified in the configuration.
+   *
+   * @param config  the configuration containing the {@link TaskProxyFactory} class.
+   */
+  public TasksResource(TaskResourceConfig config) {
+    String taskProxyFactory = config.getTaskProxyFactory();
+    Preconditions.checkArgument(StringUtils.isNotEmpty(taskProxyFactory),
+                                String.format("Missing config: %s", TaskResourceConfig.CONFIG_TASK_PROXY_FACTORY));
+    try {
+      TaskProxyFactory factory = ClassLoaderHelper.fromClassName(taskProxyFactory);
+      taskProxy = factory.getTaskProxy(config);
+    } catch (Exception e) {
+      LOG.error(String.format("Exception in building TasksResource with config: %s.", config), e);
+      throw new SamzaException(e);
+    }
+  }
+
+  /**
+   * Gets the list of {@link Task} for the job instance specified by jobName and jobId.
+   * @param jobName the name of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_NAME}
+   * @param jobId the id of the job as configured in {@link org.apache.samza.config.JobConfig#JOB_ID}.
+   * @return a {@link javax.ws.rs.core.Response.Status#OK} {@link javax.ws.rs.core.Response}
+   *         contains a list of {@link Task}, where each task belongs to
+   *         the samza job. {@link javax.ws.rs.core.Response.Status#BAD_REQUEST} is returned for invalid
+   *         job instances.
+   */
+  @GET
+  @Path("/{jobName}/{jobId}/tasks")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getTasks(
+      @PathParam("jobName") final String jobName,
+      @PathParam("jobId") final String jobId) {
+    try {
+      return Response.ok(taskProxy.getTasks(new JobInstance(jobName, jobId))).build();
+    } catch (IllegalArgumentException e) {
+      String message = String.format("Invalid arguments for getTasks. jobName: %s, jobId: %s.", jobName, jobId);
+      LOG.error(message, e);
+      return Responses.badRequestResponse(message);
+    } catch (Exception e) {
+      LOG.error(String.format("Error in getTasks with arguments jobName: %s, jobId: %s.", jobName, jobId), e);
+      return Responses.errorResponse(e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
index 7db437b..2a051c4 100644
--- a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestJobsResource.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.rest.resources;
 
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.Application;
@@ -31,6 +31,8 @@ import org.apache.samza.rest.SamzaRestConfig;
 import org.apache.samza.rest.model.Job;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.resources.mock.MockJobProxy;
+import org.apache.samza.rest.resources.mock.MockJobProxyFactory;
+import org.apache.samza.rest.resources.mock.MockResourceFactory;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
@@ -48,10 +50,11 @@ public class TestJobsResource extends JerseyTest {
 
   @Override
   protected Application configure() {
-    Map<String, String> map = new HashMap<>();
-    map.put(JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY, "org.apache.samza.rest.resources.mock.MockJobProxyFactory");
-    map.put(JobsResourceConfig.CONFIG_JOB_INSTALLATIONS_PATH, ".");
-    SamzaRestConfig config = new SamzaRestConfig(new MapConfig(map));
+    Map<String, String> configMap = ImmutableMap.of(JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY,
+                                                    MockJobProxyFactory.class.getName(),
+                                                    SamzaRestConfig.CONFIG_REST_RESOURCE_FACTORIES,
+                                                    MockResourceFactory.class.getName());
+    SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
     return new SamzaRestApplication(config);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java
new file mode 100644
index 0000000..63a9958
--- /dev/null
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.rest.SamzaRestApplication;
+import org.apache.samza.rest.SamzaRestConfig;
+import org.apache.samza.rest.model.Partition;
+import org.apache.samza.rest.model.Task;
+import org.apache.samza.rest.proxy.task.TaskResourceConfig;
+import org.apache.samza.rest.resources.mock.MockJobProxy;
+import org.apache.samza.rest.resources.mock.MockResourceFactory;
+import org.apache.samza.rest.resources.mock.MockTaskProxy;
+import org.apache.samza.rest.resources.mock.MockTaskProxyFactory;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestTasksResource extends JerseyTest {
+  ObjectMapper objectMapper = SamzaObjectMapper.getObjectMapper();
+
+  @Override
+  protected Application configure() {
+    Map<String, String> configMap = ImmutableMap.of(TaskResourceConfig.CONFIG_TASK_PROXY_FACTORY,
+                                                    MockTaskProxyFactory.class.getName(),
+                                                    SamzaRestConfig.CONFIG_REST_RESOURCE_FACTORIES,
+                                                    MockResourceFactory.class.getName());
+    SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
+    return new SamzaRestApplication(config);
+  }
+
+  @Test
+  public void testGetTasks()
+      throws IOException {
+    String requestUrl = String.format("v1/jobs/%s/%s/tasks", "testJobName", "testJobId");
+    Response response = target(requestUrl).request().get();
+    assertEquals(200, response.getStatus());
+    Task[] tasks = objectMapper.readValue(response.readEntity(String.class), Task[].class);
+    assertEquals(2, tasks.length);
+    List<Partition> partitionList = ImmutableList.of(new Partition(MockTaskProxy.SYSTEM_NAME,
+                                                                   MockTaskProxy.STREAM_NAME,
+                                                                   MockTaskProxy.PARTITION_ID));
+
+    assertEquals(null, tasks[0].getPreferredHost());
+    assertEquals(MockTaskProxy.TASK_1_CONTAINER_ID, tasks[0].getContainerId());
+    assertEquals(MockTaskProxy.TASK_1_NAME, tasks[0].getTaskName());
+    assertEquals(partitionList, tasks[0].getPartitions());
+
+    assertEquals(null, tasks[1].getPreferredHost());
+    assertEquals(MockTaskProxy.TASK_2_CONTAINER_ID, tasks[1].getContainerId());
+    assertEquals(MockTaskProxy.TASK_2_NAME, tasks[1].getTaskName());
+    assertEquals(partitionList, tasks[1].getPartitions());
+  }
+
+  @Test
+  public void testGetTasksWithInvalidJobName()
+      throws IOException {
+    String requestUrl = String.format("v1/jobs/%s/%s/tasks", "BadJobName", MockJobProxy.JOB_INSTANCE_4_ID);
+    Response resp = target(requestUrl).request().get();
+    assertEquals(400, resp.getStatus());
+    final Map<String, String> errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference<Map<String, String>>() {});
+    assertTrue(errorMessage.get("message"), errorMessage.get("message").contains("Invalid arguments for getTasks. "));
+    resp.close();
+  }
+
+  @Test
+  public void testGetTasksWithInvalidJobId()
+      throws IOException {
+    String requestUrl = String.format("v1/jobs/%s/%s/tasks", MockJobProxy.JOB_INSTANCE_1_NAME, "BadJobId");
+    Response resp = target(requestUrl).request().get();
+    assertEquals(400, resp.getStatus());
+    final Map<String, String> errorMessage = objectMapper.readValue(resp.readEntity(String.class), new TypeReference<Map<String, String>>() {});
+    assertTrue(errorMessage.get("message"), errorMessage.get("message").contains("Invalid arguments for getTasks. "));
+    resp.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java
new file mode 100644
index 0000000..aa6a0ba
--- /dev/null
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.rest.proxy.installation.InstallationFinder;
+import org.apache.samza.rest.proxy.installation.InstallationRecord;
+import org.apache.samza.rest.proxy.job.JobInstance;
+
+
+public class MockInstallationFinder implements InstallationFinder {
+
+  @Override
+  public boolean isInstalled(JobInstance jobInstance) {
+    return true;
+  }
+
+  @Override
+  public Map<JobInstance, InstallationRecord> getAllInstalledJobs() {
+    return new HashMap<>();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockResourceFactory.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockResourceFactory.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockResourceFactory.java
new file mode 100644
index 0000000..c4c0e94
--- /dev/null
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockResourceFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources.mock;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.config.Config;
+import org.apache.samza.rest.proxy.task.TaskResourceConfig;
+import org.apache.samza.rest.resources.JobsResource;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.apache.samza.rest.resources.ResourceFactory;
+import org.apache.samza.rest.resources.TasksResource;
+
+
+public class MockResourceFactory implements ResourceFactory {
+
+  @Override
+  public List<? extends Object> getResourceInstances(Config config) {
+    List<Object> resources = new ArrayList<>();
+    if (config.containsKey(JobsResourceConfig.CONFIG_JOB_PROXY_FACTORY)) {
+      resources.add(new JobsResource(new JobsResourceConfig(config)));
+    }
+    if (config.containsKey(TaskResourceConfig.CONFIG_TASK_PROXY_FACTORY)) {
+      resources.add(new TasksResource(new TaskResourceConfig(config)));
+    }
+    return resources;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
new file mode 100644
index 0000000..45f252a
--- /dev/null
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources.mock;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.rest.proxy.task.SamzaTaskProxy;
+import org.apache.samza.rest.proxy.task.TaskResourceConfig;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+public class MockTaskProxy extends SamzaTaskProxy {
+  public static final String SYSTEM_NAME = "testSystem";
+  public static final String STREAM_NAME = "testStream";
+  public static final Integer PARTITION_ID = 1;
+  public static final Set<SystemStreamPartition> SYSTEM_STREAM_PARTITIONS = ImmutableSet.of(
+      new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME, new Partition(PARTITION_ID)));
+
+  public static final String TASK_1_NAME = "Task1";
+  public static final int TASK_1_CONTAINER_ID = 1;
+  public static final Partition CHANGE_LOG_PARTITION = new Partition(0);
+
+  public static final String TASK_2_NAME = "Task2";
+  public static final int TASK_2_CONTAINER_ID = 2;
+
+  public MockTaskProxy() {
+    super(new TaskResourceConfig(new MapConfig()),
+          new MockInstallationFinder());
+  }
+
+  @Override
+  protected JobModel getJobModel(JobInstance jobInstance) {
+    if (jobInstance.getJobId().contains("Bad")
+        || jobInstance.getJobName().contains("Bad")) {
+      throw new IllegalArgumentException("No tasks found.");
+    }
+    TaskModel task1Model = new TaskModel(new TaskName(TASK_1_NAME), SYSTEM_STREAM_PARTITIONS, CHANGE_LOG_PARTITION);
+    TaskModel task2Model = new TaskModel(new TaskName(TASK_2_NAME), SYSTEM_STREAM_PARTITIONS, CHANGE_LOG_PARTITION);
+    ContainerModel task1ContainerModel = new ContainerModel(TASK_1_CONTAINER_ID,
+                                                            ImmutableMap.of(new TaskName(TASK_1_NAME),
+                                                                            task1Model));
+    ContainerModel task2ContainerModel = new ContainerModel(TASK_2_CONTAINER_ID,
+                                                            ImmutableMap.of(new TaskName(TASK_2_NAME),
+                                                                            task2Model));
+    return new JobModel(new MapConfig(), ImmutableMap.of(TASK_1_CONTAINER_ID, task1ContainerModel,
+                                                         TASK_2_CONTAINER_ID, task2ContainerModel));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxyFactory.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxyFactory.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxyFactory.java
new file mode 100644
index 0000000..353ff29
--- /dev/null
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxyFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources.mock;
+
+import org.apache.samza.rest.proxy.task.TaskProxy;
+import org.apache.samza.rest.proxy.task.TaskProxyFactory;
+import org.apache.samza.rest.proxy.task.TaskResourceConfig;
+
+
+public class MockTaskProxyFactory implements TaskProxyFactory {
+
+  @Override
+  public TaskProxy getTaskProxy(TaskResourceConfig config) {
+    return new MockTaskProxy();
+  }
+}


[2/2] samza git commit: SAMZA-1154: Tasks endpoint to list the complete details of all tasks related to a job

Posted by xi...@apache.org.
SAMZA-1154: Tasks endpoint to list the complete details of all tasks related to a job


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/26280cac
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/26280cac
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/26280cac

Branch: refs/heads/master
Commit: 26280cac2afb4066c10e669e98e4dfcb4415e79a
Parents: d399d6f
Author: Shanthoosh Venkataraman <sa...@gmail.com>
Authored: Mon Mar 20 10:43:29 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon Mar 20 10:53:57 2017 -0700

----------------------------------------------------------------------
 .../versioned/rest/resource-directory.md        |   3 +-
 .../versioned/rest/resources/tasks.md           | 125 +++++++++
 .../standalone/StandaloneJobCoordinator.java    |  12 +-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  10 +-
 .../samza/coordinator/JobModelManager.scala     | 279 ++++++++++---------
 .../scala/org/apache/samza/job/JobRunner.scala  |   2 +-
 .../main/scala/org/apache/samza/util/Util.scala |  12 +-
 .../samza/coordinator/TestJobCoordinator.scala  |  34 ++-
 .../src/main/config/samza-rest.properties       |   3 +
 .../org/apache/samza/rest/model/Partition.java  | 103 +++++++
 .../java/org/apache/samza/rest/model/Task.java  | 137 +++++++++
 .../samza/rest/proxy/job/AbstractJobProxy.java  |  21 +-
 .../samza/rest/proxy/job/JobProxyFactory.java   |   2 +-
 .../rest/proxy/job/SimpleYarnJobProxy.java      |   8 +-
 .../proxy/job/SimpleYarnJobProxyFactory.java    |  14 +-
 .../samza/rest/proxy/task/SamzaTaskProxy.java   | 169 +++++++++++
 .../rest/proxy/task/SamzaTaskProxyFactory.java  |  54 ++++
 .../apache/samza/rest/proxy/task/TaskProxy.java |  44 +++
 .../samza/rest/proxy/task/TaskProxyFactory.java |  41 +++
 .../rest/proxy/task/TaskResourceConfig.java     |  50 ++++
 .../rest/resources/BaseResourceConfig.java      |  87 ++++++
 .../rest/resources/DefaultResourceFactory.java  |   7 +-
 .../samza/rest/resources/JobsResource.java      |  20 +-
 .../rest/resources/JobsResourceConfig.java      |  40 +--
 .../apache/samza/rest/resources/Responses.java  |  52 ++++
 .../samza/rest/resources/TasksResource.java     |  97 +++++++
 .../samza/rest/resources/TestJobsResource.java  |  13 +-
 .../samza/rest/resources/TestTasksResource.java | 105 +++++++
 .../resources/mock/MockInstallationFinder.java  |  39 +++
 .../resources/mock/MockResourceFactory.java     |  44 +++
 .../rest/resources/mock/MockTaskProxy.java      |  72 +++++
 .../resources/mock/MockTaskProxyFactory.java    |  32 +++
 32 files changed, 1475 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/docs/learn/documentation/versioned/rest/resource-directory.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/rest/resource-directory.md b/docs/learn/documentation/versioned/rest/resource-directory.md
index 79746d1..3d7bfe4 100644
--- a/docs/learn/documentation/versioned/rest/resource-directory.md
+++ b/docs/learn/documentation/versioned/rest/resource-directory.md
@@ -22,6 +22,7 @@ title: Resource Directory
 The Samza REST Service ships with the JAX-RS Resources listed below.
 
 - [JobsResource](resources/jobs.html)
-- (Second resource coming soon)
+- [TasksResource](resources/tasks.html)
 
 ## [Jobs Resource &raquo;](resources/jobs.html)
+## [Tasks Resource &raquo;](resources/tasks.html)

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/docs/learn/documentation/versioned/rest/resources/tasks.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/rest/resources/tasks.md b/docs/learn/documentation/versioned/rest/resources/tasks.md
new file mode 100644
index 0000000..385848e
--- /dev/null
+++ b/docs/learn/documentation/versioned/rest/resources/tasks.md
@@ -0,0 +1,125 @@
+---
+layout: page
+title: Tasks Resource
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+This resource exposes endpoints to support operations at the tasks scope. The initial implementation includes the ability to list all the tasks for a particular job.
+This is a sub-resource of the [Jobs Resource &raquo;](jobs.html) and is not intended to be used independently.
+
+Responses of individual endpoints will vary in accordance with their functionality and scope. However, the error
+messages of all of the tasks resource end points will be of the following form.
+
+**Error Message**
+
+Every error response will have the following structure:
+
+{% highlight json %}
+{
+    "message": "Unrecognized status parameter: null"
+}
+{% endhighlight %}
+`message` is the only field in the response and contains a description of the problem.
+<br>
+
+## Get All Tasks
+Lists the complete details about all the tasks for a particular job
+
+######Request
+    GET /v1/jobs/{jobName}/{jobId}/tasks
+
+######Response
+Status: 200 OK
+
+{% highlight json %}
+ [
+{
+   "preferredHost" : "samza-preferredHost",
+   "taskName" : "Samza task",
+   "containerId" : "0",
+   "partitions" : [{
+                      "system" : "kafka",
+                      "stream" : "topic-name",
+                      "partitionId" : "0"
+                    }]
+ }
+ ]
+{% endhighlight %}
+
+######Response codes
+<table class="table table-condensed table-bordered table-striped">
+  <thead>
+    <tr>
+      <th>Status</th>
+      <th>Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>200 OK</td><td>The operation completed successfully and all the tasks that for 
+      the job are returned.</td>
+    </tr>
+    <tr>
+      <td>404 Not Found</td><td>Invalid job instance was provided as an argument.{% highlight json %}
+{
+    "message": "Invalid arguments for getTasks. jobName: SamzaJobName jobId: SamzaJobId."
+}
+{% endhighlight %}</td>
+    </tr>
+    <tr>
+      <td>500 Server Error</td><td>There was an error executing the command on the server. e.g. The command timed out.{% highlight json %}
+{
+    "message": "Timeout waiting for get all tasks."
+}
+{% endhighlight %}</td>
+    </tr>
+  </tbody>
+</table>
+<br/>
+
+<br/>
+
+###Design
+###Abstractions
+There are two primary abstractions that are required by the TasksResource that users can implement to handle any details specific to their environment.
+
+1.  **TaskProxy**: This interface is the central point of interacting with Samza tasks. It exposes a method to get all the tasks of a Samza job.
+2.  **InstallationFinder**: The InstallationFinder provides a generic interface to discover all the installed jobs, hiding any customizations in the job package structure and its location. The InstallationFinder also resolves the job configuration, which is used to validate and identify the job.
+
+## Configuration
+The TasksResource properties should be specified in the same file as the Samza REST configuration.
+
+<table class="table table-condensed table-bordered table-striped">
+  <thead>
+    <tr>
+      <th>Name</th>
+      <th>Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>task.proxy.factory.class</td><td><b>Required:</b> The TaskProxyFactory that will be used to create the TaskProxy instances. The value is a fully-qualified class name which must implement TaskProxyFactory. Samza ships with one implementation: <pre>org.apache.samza.rest.proxy.task.SamzaTaskProxy</pre> <li> gets the details of all the tasks of a job. It uses the <pre>SimpleInstallationRecord</pre> to interact with Samza jobs installed on disk.</li></td>
+    </tr>
+    <tr>
+      <td>job.installations.path</td><td><b>Required:</b> The file system path which contains the Samza job installations. The path must be on the same host as the Samza REST Service. Each installation must be a directory with structure conforming to the expectations of the InstallationRecord implementation used by the JobProxy.</td>
+    </tr>
+    <tr>
+      <td>job.config.factory.class</td><td>The config factory to use for reading Samza job configs. This is used to fetch the job.name and job.id properties for each job instance in the InstallationRecord. It's also used to validate that a particular directory within the installation path actually contains Samza jobs. If not specified <pre>org.apache.samza.config.factories.PropertiesConfigFactory</pre> will be used. </td>
+    </tr>
+  </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
index 46dbf30..b2927f4 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
@@ -19,12 +19,12 @@
 package org.apache.samza.standalone;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.JobModelManager$;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.processor.SamzaContainerController;
 import org.apache.samza.system.StreamMetadataCache;
@@ -65,7 +65,7 @@ public class StandaloneJobCoordinator implements JobCoordinator {
   private static final Logger log = LoggerFactory.getLogger(StandaloneJobCoordinator.class);
   private final int processorId;
   private final Config config;
-  private final JobModelManager jobModelManager;
+  private final JobModel jobModel;
   private final SamzaContainerController containerController;
 
   @VisibleForTesting
@@ -73,11 +73,11 @@ public class StandaloneJobCoordinator implements JobCoordinator {
       int processorId,
       Config config,
       SamzaContainerController containerController,
-      JobModelManager jobModelManager) {
+      JobModel jobModel) {
     this.processorId = processorId;
     this.config = config;
     this.containerController = containerController;
-    this.jobModelManager = jobModelManager;
+    this.jobModel = jobModel;
   }
 
   public StandaloneJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
@@ -105,7 +105,7 @@ public class StandaloneJobCoordinator implements JobCoordinator {
      * TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
      * (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
      */
-    this.jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null, null);
+    this.jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null);
   }
 
   @Override
@@ -143,6 +143,6 @@ public class StandaloneJobCoordinator implements JobCoordinator {
 
   @Override
   public JobModel getJobModel() {
-    return jobModelManager.jobModel();
+    return jobModel;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 1d16d4a..944c438 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.samza.zk;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,7 +29,6 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.JobModelManager$;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.processor.SamzaContainerController;
 import org.apache.samza.system.StreamMetadataCache;
@@ -57,7 +57,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
   private JobModel newJobModel;
   private String newJobModelVersion;  // version published in ZK (by the leader)
-  private JobModelManager jobModelManager;
+  private JobModel jobModel;
 
   public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) {
     this.zkUtils = zkUtils;
@@ -201,9 +201,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     }
     log.info("generate new job model: processorsIds: " + sb.toString());
 
-    jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null,
-        containerIds);
-    JobModel jobModel = jobModelManager.jobModel();
+    jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, containerIds);
 
     log.info("pid=" + processorId + "Generated jobModel: " + jobModel);
 
@@ -218,4 +216,4 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
     log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 14d5dff..4122c87 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -26,104 +26,145 @@ import java.util.concurrent.atomic.AtomicReference
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.{Config, StorageConfig}
+import org.apache.samza.config.Config
+import org.apache.samza.config.StorageConfig
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
-import org.apache.samza.container.grouper.task.{BalancingTaskNameGrouper, TaskNameGrouperFactory}
-import org.apache.samza.container.{LocalityManager, TaskName}
-import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
+import org.apache.samza.container.grouper.task.BalancingTaskNameGrouper
+import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
+import org.apache.samza.container.LocalityManager
+import org.apache.samza.container.TaskName
+import org.apache.samza.coordinator.server.HttpServer
+import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
-import org.apache.samza.job.model.{JobModel, TaskModel}
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.TaskModel
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.system.{ExtendedSystemAdmin, StreamMetadataCache, SystemFactory, SystemStreamPartition, SystemStreamPartitionMatcher}
-import org.apache.samza.util.{Logging, Util}
-import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.system.ExtendedSystemAdmin
+import org.apache.samza.system.StreamMetadataCache
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStreamPartitionMatcher
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.util.Logging
+import org.apache.samza.util.Util
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-
 /**
- * Helper companion object that is responsible for wiring up a JobCoordinator
+ * Helper companion object that is responsible for wiring up a JobModelManager
  * given a Config object.
  */
 object JobModelManager extends Logging {
 
+  val SOURCE = "JobModelManager"
   /**
-   * a volatile value to store the current instantiated <code>JobCoordinator</code>
+   * a volatile value to store the current instantiated <code>JobModelManager</code>
    */
   @volatile var currentJobModelManager: JobModelManager = null
   val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]()
 
   /**
-   * @param coordinatorSystemConfig A config object that contains job.name,
-   * job.id, and all system.&lt;job-coordinator-system-name&gt;.*
-   * configuration. The method will use this config to read all configuration
-   * from the coordinator stream, and instantiate a JobCoordinator.
+   * Does the following actions for a job.
+   * a) Reads the jobModel from coordinator stream using the job's configuration.
+   * b) Creates changeLogStream for task stores if it does not exists.
+   * c) Recomputes changelog partition mapping based on jobModel and job's configuration
+   * and writes it to the coordinator stream.
+   * d) Builds JobModelManager using the jobModel read from coordinator stream.
+   * @param coordinatorSystemConfig A config object that contains job.name
+   *                                job.id, and all system.&lt;job-coordinator-system-name&gt;.*
+   *                                configuration. The method will use this config to read all configuration
+   *                                from the coordinator stream, and instantiate a JobModelManager.
    */
   def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobModelManager = {
-    val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory()
-    val coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
-    val coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
-    info("Registering coordinator system stream.")
-    coordinatorSystemConsumer.register
-    debug("Starting coordinator system stream.")
-    coordinatorSystemConsumer.start
-    debug("Bootstrapping coordinator system stream.")
-    coordinatorSystemConsumer.bootstrap
-    val source = "Job-coordinator"
-    coordinatorSystemProducer.register(source)
-    info("Registering coordinator system stream producer.")
-    val config = coordinatorSystemConsumer.getConfig
-    info("Got config: %s" format config)
-    val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
-    val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
+    var coordinatorSystemConsumer: CoordinatorStreamSystemConsumer = null
+    var coordinatorSystemProducer: CoordinatorStreamSystemProducer = null
+    try {
+      val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory()
+      coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
+      coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
+      info("Registering coordinator system stream consumer.")
+      coordinatorSystemConsumer.register
+      debug("Starting coordinator system stream consumer.")
+      coordinatorSystemConsumer.start
+      debug("Bootstrapping coordinator system stream consumer.")
+      coordinatorSystemConsumer.bootstrap
+      info("Registering coordinator system stream producer.")
+      coordinatorSystemProducer.register(SOURCE)
+
+      val config = coordinatorSystemConsumer.getConfig
+      info("Got config: %s" format config)
+      val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, SOURCE)
+      changelogManager.start()
+      val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
+      // We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers.
+      // TODO: This code will go away with refactoring - SAMZA-678
 
-    val systemNames = getSystemNames(config)
+      localityManager.start()
 
-    // Map the name of each system to the corresponding SystemAdmin
-    val systemAdmins = systemNames.map(systemName => {
-      val systemFactoryClassName = config
-        .getSystemFactory(systemName)
-        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
-      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
-      systemName -> systemFactory.getAdmin(systemName, config)
-    }).toMap
+      // Map the name of each system to the corresponding SystemAdmin
+      val systemAdmins = getSystemAdmins(config)
 
-    val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0)
-    var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
-    if (config.getMonitorPartitionChange) {
-      val extendedSystemAdmins = systemAdmins.filter{
-                                                      case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
-                                                    }
-      val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.containsKey(systemStream.getSystem))
-      if (inputStreamsToMonitor.nonEmpty) {
-        streamPartitionCountMonitor = new StreamPartitionCountMonitor(
-          setAsJavaSet(inputStreamsToMonitor),
-          streamMetadataCache,
-          metricsRegistryMap,
-          config.getMonitorPartitionChangeFrequency)
+      val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0)
+      var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
+      if (config.getMonitorPartitionChange) {
+        val extendedSystemAdmins = systemAdmins.filter{
+          case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
+        }
+        val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.containsKey(systemStream.getSystem))
+        if (inputStreamsToMonitor.nonEmpty) {
+          streamPartitionCountMonitor = new StreamPartitionCountMonitor(
+            setAsJavaSet(inputStreamsToMonitor),
+            streamMetadataCache,
+            metricsRegistryMap,
+            config.getMonitorPartitionChangeFrequency)
+        }
+      }
+      val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping()
+      val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, streamPartitionCountMonitor, null)
+      val jobModel = jobModelManager.jobModel
+      // Save the changelog mapping back to the ChangelogPartitionmanager
+      // newChangelogPartitionMapping is the merging of all current task:changelog
+      // assignments with whatever we had before (previousChangelogPartitionMapping).
+      // We must persist legacy changelog assignments so that
+      // maxChangelogPartitionId always has the absolute max, not the current
+      // max (in case the task with the highest changelog partition mapping
+      // disappears.
+      val newChangelogPartitionMapping = jobModel.getContainers.flatMap(_._2.getTasks).map{case (taskName,taskModel) => {
+        taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
+      }}.toMap ++ previousChangelogPartitionMapping
+      info("Saving task-to-changelog partition mapping: %s" format newChangelogPartitionMapping)
+      changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping)
+
+      createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions)
+
+      jobModelManager
+    } finally {
+      if (coordinatorSystemConsumer != null) {
+        coordinatorSystemConsumer.stop()
+      }
+      if (coordinatorSystemProducer != null) {
+        coordinatorSystemProducer.stop()
       }
     }
-
-    val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache, streamPartitionCountMonitor, null)
-    createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions)
-
-    jobCoordinator
   }
-
   def apply(coordinatorSystemConfig: Config): JobModelManager = apply(coordinatorSystemConfig, new MetricsRegistryMap())
 
   /**
-   * Build a JobCoordinator using a Samza job's configuration.
+   * Build a JobModelManager using a Samza job's configuration.
    */
-  def getJobCoordinator(config: Config,
-                        changelogManager: ChangelogPartitionManager,
-                        localityManager: LocalityManager,
-                        streamMetadataCache: StreamMetadataCache,
-                        streamPartitionCountMonitor: StreamPartitionCountMonitor,
-                        containerIds: java.util.List[Integer]) = {
-    val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache, containerIds)
+  private def getJobModelManager(config: Config,
+                                changeLogMapping: util.Map[TaskName, Integer],
+                                localityManager: LocalityManager,
+                                streamMetadataCache: StreamMetadataCache,
+                                streamPartitionCountMonitor: StreamPartitionCountMonitor,
+                                containerIds: java.util.List[Integer]) = {
+    val jobModel: JobModel = readJobModel(config, changeLogMapping, localityManager, streamMetadataCache, containerIds)
     jobModelRef.set(jobModel)
 
     val server = new HttpServer
@@ -136,7 +177,7 @@ object JobModelManager extends Logging {
    * For each input stream specified in config, exactly determine its
    * partitions, returning a set of SystemStreamPartitions containing them all.
    */
-  def getInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache) = {
+  private def getInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache) = {
     val inputSystemStreams = config.getInputStreams
 
     // Get the set of partitions for each SystemStream from the stream metadata
@@ -151,7 +192,7 @@ object JobModelManager extends Logging {
       }.toSet
   }
 
-  def getMatchedInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache) : Set[SystemStreamPartition] = {
+  private def getMatchedInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache): Set[SystemStreamPartition] = {
     val allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache)
     config.getSSPMatcherClass match {
       case Some(s) => {
@@ -172,98 +213,40 @@ object JobModelManager extends Logging {
     }
   }
 
-
   /**
    * Gets a SystemStreamPartitionGrouper object from the configuration.
    */
-  def getSystemStreamPartitionGrouper(config: Config) = {
+  private def getSystemStreamPartitionGrouper(config: Config) = {
     val factoryString = config.getSystemStreamPartitionGrouperFactory
     val factory = Util.getObj[SystemStreamPartitionGrouperFactory](factoryString)
     factory.getSystemStreamPartitionGrouper(config)
   }
 
   /**
-   * The method intializes the jobModel and returns it to the caller.
-   * Note: refreshJobModel can be used as a lambda for JobModel generation in the future.
+   * The function reads the latest checkpoint from the underlying coordinator stream and
+   * builds a new JobModel.
    */
-  private def initializeJobModel(config: Config,
-                                 changelogManager: ChangelogPartitionManager,
-                                 localityManager: LocalityManager,
-                                 streamMetadataCache: StreamMetadataCache,
-                                 containerIds: java.util.List[Integer]): JobModel = {
+  def readJobModel(config: Config,
+                   changeLogPartitionMapping: util.Map[TaskName, Integer],
+                   localityManager: LocalityManager,
+                   streamMetadataCache: StreamMetadataCache,
+                   containerIds: java.util.List[Integer]): JobModel = {
     // Do grouping to fetch TaskName to SSP mapping
     val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache)
     val grouper = getSystemStreamPartitionGrouper(config)
     val groups = grouper.group(allSystemStreamPartitions)
     info("SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s" format(grouper, groups.size(), groups.keySet()))
 
-    // Initialize the ChangelogPartitionManager and the CheckpointManager
-    val previousChangelogMapping = if (changelogManager != null)
-    {
-      changelogManager.start()
-      changelogManager.readChangeLogPartitionMapping()
-    }
-    else
-    {
-      new util.HashMap[TaskName, Integer]()
-    }
-    // We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers.
-    // TODO: This code will go away with refactoring - SAMZA-678
-
-    if (localityManager != null) {
-      localityManager.start()
-    }
-
-    // Generate the jobModel
-    def jobModelGenerator(): JobModel = refreshJobModel(config,
-                                                        groups,
-                                                        previousChangelogMapping,
-                                                        localityManager,
-                                                        containerIds)
-
-    val jobModel = jobModelGenerator()
-
-    // Save the changelog mapping back to the ChangelogPartitionmanager
-    if (changelogManager != null)
-    {
-      // newChangelogMapping is the merging of all current task:changelog
-      // assignments with whatever we had before (previousChangelogMapping).
-      // We must persist legacy changelog assignments so that
-      // maxChangelogPartitionId always has the absolute max, not the current
-      // max (in case the task with the highest changelog partition mapping
-      // disappears.
-      val newChangelogMapping = jobModel.getContainers.flatMap(_._2.getTasks).map{case (taskName,taskModel) => {
-                                                 taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
-                                               }}.toMap ++ previousChangelogMapping
-      info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping)
-      changelogManager.writeChangeLogPartitionMapping(newChangelogMapping)
-    }
-
-    jobModel
-  }
-
-  /**
-   * Build a full Samza job model. The function reads the latest checkpoint from the underlying coordinator stream and
-   * builds a new JobModel.
-   * Note: This method no longer needs to be thread safe because HTTP request from a container no longer triggers a jobmodel
-   * refresh. Hence, there is no need for synchronization as before.
-   */
-  private def refreshJobModel(config: Config,
-                              groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
-                              previousChangelogMapping: util.Map[TaskName, Integer],
-                              localityManager: LocalityManager,
-                              containerIds: java.util.List[Integer]): JobModel = {
-
     // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
     // mapping.
-    var maxChangelogPartitionId = previousChangelogMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
+    var maxChangelogPartitionId = changeLogPartitionMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
     // Sort the groups prior to assigning the changelog mapping so that the mapping is reproducible and intuitive
     val sortedGroups = new util.TreeMap[TaskName, util.Set[SystemStreamPartition]](groups)
 
     // Assign all SystemStreamPartitions to TaskNames.
     val taskModels = {
       sortedGroups.map { case (taskName, systemStreamPartitions) =>
-        val changelogPartition = Option(previousChangelogMapping.get(taskName)) match {
+        val changelogPartition = Option(changeLogPartitionMapping.get(taskName)) match {
           case Some(changelogPartitionId) => new Partition(changelogPartitionId)
           case _ =>
             // If we've never seen this TaskName before, then assign it a
@@ -291,6 +274,24 @@ object JobModelManager extends Logging {
     new JobModel(config, containerMap, localityManager)
   }
 
+  /**
+   * Instantiates the system admins based upon the system factory class available in {@param config}.
+   * @param config contains adequate information to instantiate the SystemAdmin.
+   * @return a map of SystemName(String) to the instantiated SystemAdmin.
+   */
+  def getSystemAdmins(config: Config) : Map[String, SystemAdmin] = {
+    val systemNames = getSystemNames(config)
+    // Map the name of each system to the corresponding SystemAdmin
+    val systemAdmins = systemNames.map(systemName => {
+      val systemFactoryClassName = config
+        .getSystemFactory(systemName)
+        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
+      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+      systemName -> systemFactory.getAdmin(systemName, config)
+    }).toMap
+    systemAdmins
+  }
+
   private def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) {
     val changeLogSystemStreams = config
       .getStoreNames
@@ -313,7 +314,7 @@ object JobModelManager extends Logging {
 }
 
 /**
- * <p>JobCoordinator is responsible for managing the lifecycle of a Samza job
+ * <p>JobModelManager is responsible for managing the lifecycle of a Samza job
  * once it's been started. This includes starting and stopping containers,
  * managing configuration, etc.</p>
  *
@@ -321,7 +322,7 @@ object JobModelManager extends Logging {
  * must integrate with the job coordinator.</p>
  *
  * <p>This class' API is currently unstable, and likely to change. The
- * coordinator's responsibility is simply to propagate the job model, and HTTP
+ * responsibility is simply to propagate the job model, and HTTP
  * server right now.</p>
  */
 class JobModelManager(

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 6d8b24d..70e5a51 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,7 +20,7 @@
 package org.apache.samza.job
 
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{ConfigRewriter, Config}
+import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
 import org.apache.samza.job.ApplicationStatus.Running

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 97bd22a..4a945d2 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -23,19 +23,23 @@ import java.net._
 import java.io._
 import java.lang.management.ManagementFactory
 import java.util.zip.CRC32
-import org.apache.samza.config.ConfigRewriter
 import org.apache.samza.{SamzaException, Partition}
 import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
 import java.util.Random
+
 import org.apache.samza.config.Config
+import org.apache.samza.config.ConfigException
+import org.apache.samza.config.ConfigRewriter
+import org.apache.samza.config.JobConfig
+import org.apache.samza.config.MapConfig
 import org.apache.samza.config.SystemConfig
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.ConfigException
-import org.apache.samza.config.MapConfig
+
 import scala.collection.JavaConversions._
-import org.apache.samza.config.JobConfig
 import java.io.InputStreamReader
+
+
 import scala.collection.immutable.Map
 import org.apache.samza.serializers._
 

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index fcabc69..8f02c78 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -22,29 +22,39 @@ package org.apache.samza.coordinator
 import java.util
 
 import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
-import org.apache.samza.job.MockJobFactory
-import org.apache.samza.job.local.{ProcessJobFactory, ThreadJobFactory}
+import org.apache.samza.job.local.ProcessJobFactory
+import org.apache.samza.job.local.ThreadJobFactory
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.util.Util
-import org.junit.{After, Test}
+import org.junit.After
+import org.junit.Test
 import org.junit.Assert._
+
 import scala.collection.JavaConversions._
 import org.apache.samza.config.MapConfig
 import org.apache.samza.config.TaskConfig
 import org.apache.samza.config.SystemConfig
-import org.apache.samza.container.{SamzaContainer, TaskName}
+import org.apache.samza.container.SamzaContainer
+import org.apache.samza.container.TaskName
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.Config
 import org.apache.samza.system._
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.{SamzaException, Partition}
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.TaskModel
 import org.apache.samza.config.JobConfig
-import org.apache.samza.coordinator.stream.{MockCoordinatorStreamWrappedConsumer, MockCoordinatorStreamSystemFactory}
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamWrappedConsumer
+import org.apache.samza.job.MockJobFactory
+import org.scalatest.{FlatSpec, PrivateMethodTester}
+
+import scala.collection.immutable
 
-class TestJobCoordinator {
+
+class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
   /**
    * Builds a coordinator from config, and then compares it with what was
    * expected. We simulate having a checkpoint manager that has 2 task
@@ -251,9 +261,11 @@ class TestJobCoordinator {
     }).toMap
 
     val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+    val getInputStreamPartitions = PrivateMethod[immutable.Set[Any]]('getInputStreamPartitions)
+    val getMatchedInputStreamPartitions = PrivateMethod[immutable.Set[Any]]('getMatchedInputStreamPartitions)
 
-    val allSSP = JobModelManager.getInputStreamPartitions(config, streamMetadataCache)
-    val matchedSSP = JobModelManager.getMatchedInputStreamPartitions(config, streamMetadataCache)
+    val allSSP = JobModelManager invokePrivate getInputStreamPartitions(config, streamMetadataCache)
+    val matchedSSP = JobModelManager invokePrivate  getMatchedInputStreamPartitions(config, streamMetadataCache)
     assertEquals(matchedSSP, allSSP)
   }
 
@@ -320,7 +332,7 @@ class MockSystemAdmin extends ExtendedSystemAdmin {
   override def createCoordinatorStream(streamName: String) {
     new UnsupportedOperationException("Method not implemented.")
   }
-  
+
   override def offsetComparator(offset1: String, offset2: String) = null
 
   override def getSystemStreamPartitionCounts(streamNames: util.Set[String],
@@ -341,4 +353,4 @@ class MockSystemAdmin extends ExtendedSystemAdmin {
   override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = null
 
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/config/samza-rest.properties
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/config/samza-rest.properties b/samza-rest/src/main/config/samza-rest.properties
index 7be0b47..0b9dcc3 100644
--- a/samza-rest/src/main/config/samza-rest.properties
+++ b/samza-rest/src/main/config/samza-rest.properties
@@ -21,3 +21,6 @@ services.rest.port=9139
 # JobsResource
 job.proxy.factory.class=org.apache.samza.rest.proxy.job.SimpleYarnJobProxyFactory
 job.installations.path=/export/content/samza/deploy/
+
+# TasksResource
+task.proxy.factory.class=org.apache.samza.rest.proxy.task.SamzaTaskProxyFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/model/Partition.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/Partition.java b/samza-rest/src/main/java/org/apache/samza/rest/model/Partition.java
new file mode 100644
index 0000000..8911cb8
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/model/Partition.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.model;
+
+import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Provides a client view of the samza system stream partition.
+ * Includes the system name, stream name and partition id
+ */
+public class Partition {
+
+  private String system;
+  private String stream;
+  private int partitionId;
+
+  public Partition() {
+  }
+
+  public Partition(@JsonProperty("system") String system,
+                   @JsonProperty("stream") String stream,
+                   @JsonProperty("partitionId") int partitionId) {
+    this.system = system;
+    this.stream = stream;
+    this.partitionId = partitionId;
+  }
+
+  public Partition(SystemStreamPartition systemStreamPartition) {
+    this(systemStreamPartition.getSystem(),
+         systemStreamPartition.getStream(),
+         systemStreamPartition.getPartition().getPartitionId());
+  }
+
+  public String getSystem() {
+    return system;
+  }
+
+  public void setSystem(String system) {
+    this.system = system;
+  }
+
+  public String getStream() {
+    return stream;
+  }
+
+  public void setStream(String stream) {
+    this.stream = stream;
+  }
+
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  public void setPartitionId(int partitionId) {
+    this.partitionId = partitionId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof Partition)) {
+      return false;
+    }
+
+    Partition partition = (Partition) o;
+
+    if (partitionId != partition.partitionId) {
+      return false;
+    }
+    if (!system.equals(partition.system)) {
+      return false;
+    }
+    return stream.equals(partition.stream);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = system.hashCode();
+    result = 31 * result + stream.hashCode();
+    result = 31 * result + partitionId;
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
new file mode 100644
index 0000000..f1225ec
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.model;
+
+import java.util.List;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Provides a client view of a samza task.
+ * Includes the preferred preferredHost, taskName, containerId, containerId and list of partitions.
+ *
+ */
+public class Task {
+
+  // preferred host of the task
+  private String preferredHost;
+
+  // name of the task
+  private String taskName;
+
+  // containerId of the samza container in which the task is running
+  private int containerId;
+
+  // list of partitions that belong to the task.
+  private List<Partition> partitions;
+
+  // list of stores that are associated with the task.
+  private List<String> storeNames;
+
+  public Task() {
+  }
+
+  public Task(@JsonProperty("preferredHost") String preferredHost,
+              @JsonProperty("taskName") String taskName,
+              @JsonProperty("containerId") int containerId,
+              @JsonProperty("partitions") List<Partition> partitions,
+              @JsonProperty("storeNames") List<String> storeNames) {
+    this.preferredHost = preferredHost;
+    this.taskName = taskName;
+    this.containerId = containerId;
+    this.partitions = partitions;
+    this.storeNames = storeNames;
+  }
+
+  public String getPreferredHost() {
+    return preferredHost;
+  }
+
+  public void setPreferredHost(String preferredHost) {
+    this.preferredHost = preferredHost;
+  }
+
+  public int getContainerId() {
+    return containerId;
+  }
+
+  public void setContainerId(int containerId) {
+    this.containerId = containerId;
+  }
+
+  public List<Partition> getPartitions() {
+    return partitions;
+  }
+
+  public void setPartitions(List<Partition> partitions) {
+    this.partitions = partitions;
+  }
+
+  public String getTaskName() {
+    return taskName;
+  }
+
+  public void setTaskName(String taskName) {
+    this.taskName = taskName;
+  }
+
+  public List<String> getStoreNames() {
+    return storeNames;
+  }
+
+  public void setStoreNames(List<String> storeNames) {
+    this.storeNames = storeNames;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof Task)) {
+      return false;
+    }
+
+    Task task = (Task) o;
+
+    if (containerId != task.containerId) {
+      return false;
+    }
+    if (!preferredHost.equals(task.preferredHost)) {
+      return false;
+    }
+    if (!taskName.equals(task.taskName)) {
+      return false;
+    }
+    if (!partitions.equals(task.partitions)) {
+      return false;
+    }
+    return storeNames.equals(task.storeNames);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = preferredHost.hashCode();
+    result = 31 * result + taskName.hashCode();
+    result = 31 * result + containerId;
+    result = 31 * result + partitions.hashCode();
+    result = 31 * result + storeNames.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
index 4d8647f..492385f 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
@@ -24,8 +24,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.ConfigFactory;
-import org.apache.samza.config.factories.PropertiesConfigFactory;
 import org.apache.samza.rest.model.Job;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.resources.JobsResourceConfig;
@@ -33,7 +31,6 @@ import org.apache.samza.util.ClassLoaderHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Implements a subset of the {@link JobProxy} interface with the default, cluster-agnostic,
  * implementations. Subclasses are expected to override these default methods where necessary.
@@ -53,7 +50,7 @@ public abstract class AbstractJobProxy implements JobProxy {
     String jobProxyFactory = config.getJobProxyFactory();
     if (jobProxyFactory != null && !jobProxyFactory.isEmpty()) {
       try {
-        JobProxyFactory factory = ClassLoaderHelper.<JobProxyFactory>fromClassName(jobProxyFactory);
+        JobProxyFactory factory = ClassLoaderHelper.fromClassName(jobProxyFactory);
         return factory.getJobProxy(config);
       } catch (Exception e) {
         throw new SamzaException(e);
@@ -110,22 +107,6 @@ public abstract class AbstractJobProxy implements JobProxy {
   }
 
   /**
-   * @return the {@link ConfigFactory} to use to read job configuration files.
-   */
-  protected ConfigFactory getJobConfigFactory() {
-    String configFactoryClassName = config.get(JobsResourceConfig.CONFIG_JOB_CONFIG_FACTORY);
-    if (configFactoryClassName == null) {
-      configFactoryClassName = PropertiesConfigFactory.class.getCanonicalName();
-      log.warn("{} not specified. Defaulting to {}", JobsResourceConfig.CONFIG_JOB_CONFIG_FACTORY, configFactoryClassName);
-    }
-
-    try {
-      return ClassLoaderHelper.<ConfigFactory>fromClassName(configFactoryClassName);
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-  /**
    * @return the {@link JobStatusProvider} to use in retrieving the job status.
    */
   protected abstract JobStatusProvider getJobStatusProvider();

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java
index 067711a..d54a6f6 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/JobProxyFactory.java
@@ -34,7 +34,7 @@ public interface JobProxyFactory {
   /**
    * Creates a new {@link JobProxy} and initializes it with the specified config.
    *
-   * @param config  the {@link org.apache.samza.rest.SamzaRestConfig} to pass to the proxy.
+   * @param config  the {@link JobsResourceConfig} to pass to the proxy.
    * @return        the created proxy.
    */
   JobProxy getJobProxy(JobsResourceConfig config);

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
index a935c98..677be1a 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
@@ -25,10 +25,10 @@ import org.apache.samza.rest.proxy.installation.InstallationFinder;
 import org.apache.samza.rest.proxy.installation.InstallationRecord;
 import org.apache.samza.rest.proxy.installation.SimpleInstallationFinder;
 import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.apache.samza.util.ClassLoaderHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Extends the {@link ScriptJobProxy} with methods specific to simple Samza deployments.
  */
@@ -45,10 +45,10 @@ public class SimpleYarnJobProxy extends ScriptJobProxy {
 
   private final InstallationFinder installFinder;
 
-  public SimpleYarnJobProxy(JobsResourceConfig config) {
+  public SimpleYarnJobProxy(JobsResourceConfig config) throws Exception {
     super(config);
-
-    installFinder = new SimpleInstallationFinder(config.getInstallationsPath(), getJobConfigFactory());
+    this.installFinder = new SimpleInstallationFinder(config.getInstallationsPath(),
+                                                      ClassLoaderHelper.fromClassName(config.getJobConfigFactory()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
index 11d93d4..1e8556b 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxyFactory.java
@@ -18,18 +18,28 @@
  */
 package org.apache.samza.rest.proxy.job;
 
+import org.apache.samza.SamzaException;
 import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Factory to produce SimpleJobProxy instances.
  *
- * See {@link AbstractJobProxy#fromFactory(org.apache.samza.rest.resources.JobsResourceConfig)}
+ * See {@link AbstractJobProxy#fromFactory(JobsResourceConfig)}
  */
 public class SimpleYarnJobProxyFactory implements JobProxyFactory {
 
+  private static final Logger LOG = LoggerFactory.getLogger(SimpleYarnJobProxyFactory.class);
+
   @Override
   public JobProxy getJobProxy(JobsResourceConfig config) {
-    return new SimpleYarnJobProxy(config);
+    try {
+      return new SimpleYarnJobProxy(config);
+    } catch (Exception e) {
+      LOG.error("Exception during instantiation of SimpleYarnJobProxy: ", e);
+      throw new SamzaException(e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
new file mode 100644
index 0000000..27c88e5
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigFactory;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.rest.model.Partition;
+import org.apache.samza.rest.model.Task;
+import org.apache.samza.rest.proxy.installation.InstallationFinder;
+import org.apache.samza.rest.proxy.installation.InstallationRecord;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.storage.ChangelogPartitionManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * {@link TaskProxy} interface implementation for samza jobs running in yarn execution environment.
+ * getTasks implementation reads the jobModel of the job specified by {@link JobInstance} from coordinator stream.
+ */
+public class SamzaTaskProxy implements TaskProxy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaTaskProxy.class);
+
+  private static final MetricsRegistryMap METRICS_REGISTRY = new MetricsRegistryMap();
+
+  private static final String SOURCE = "SamzaTaskProxy";
+
+  private final TaskResourceConfig taskResourceConfig;
+
+  private final InstallationFinder installFinder;
+
+  public SamzaTaskProxy(TaskResourceConfig taskResourceConfig, InstallationFinder installFinder) {
+    this.taskResourceConfig = taskResourceConfig;
+    this.installFinder = installFinder;
+  }
+
+  /**
+   * Fetches the complete job model from the coordinator stream based upon the provided {@link JobInstance}
+   * param, transforms it to a list of {@link Task} and returns it.
+   * {@inheritDoc}
+   */
+  @Override
+  public List<Task> getTasks(JobInstance jobInstance)
+      throws IOException, InterruptedException {
+    Preconditions.checkArgument(installFinder.isInstalled(jobInstance),
+                                String.format("Invalid job instance : %s", jobInstance));
+    JobModel jobModel = getJobModel(jobInstance);
+    StorageConfig storageConfig = new StorageConfig(jobModel.getConfig());
+
+    List<String> storeNames = JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava();
+    Map<Integer, String> containerLocality = jobModel.getAllContainerLocality();
+    List<Task> tasks = new ArrayList<>();
+    for (ContainerModel containerModel : jobModel.getContainers().values()) {
+      int containerId = containerModel.getContainerId();
+      String host = containerLocality.get(containerId);
+      for (TaskModel taskModel : containerModel.getTasks().values()) {
+        String taskName = taskModel.getTaskName().getTaskName();
+        List<Partition> partitions = taskModel.getSystemStreamPartitions()
+                                              .stream()
+                                              .map(Partition::new).collect(Collectors.toList());
+        tasks.add(new Task(host, taskName, containerId, partitions, storeNames));
+      }
+    }
+    return tasks;
+  }
+
+  /**
+   * Builds coordinator system config for the {@param jobInstance}.
+   * @param jobInstance the job instance to get the jobModel for.
+   * @return the constructed coordinator system config.
+   */
+  private Config getCoordinatorSystemConfig(JobInstance jobInstance) {
+    try {
+      InstallationRecord record = installFinder.getAllInstalledJobs().get(jobInstance);
+      ConfigFactory configFactory =  ClassLoaderHelper.fromClassName(taskResourceConfig.getJobConfigFactory());
+      Config config = configFactory.getConfig(new URI(String.format("file://%s", record.getConfigFilePath())));
+      Map<String, String> configMap = ImmutableMap.of(JobConfig.JOB_ID(), jobInstance.getJobId(),
+                                                      JobConfig.JOB_NAME(), jobInstance.getJobName());
+      return Util.buildCoordinatorStreamConfig(new MapConfig(ImmutableList.of(config, configMap)));
+    } catch (Exception e) {
+      LOG.error(String.format("Failed to get coordinator stream config for job : %s", jobInstance), e);
+      throw new SamzaException(e);
+    }
+  }
+
+  /**
+   * Retrieves the jobModel from the jobCoordinator.
+   * @param jobInstance the job instance (jobId, jobName).
+   * @return the JobModel fetched from the coordinator stream.
+   */
+  protected JobModel getJobModel(JobInstance jobInstance) {
+    CoordinatorStreamSystemConsumer coordinatorSystemConsumer = null;
+    CoordinatorStreamSystemProducer coordinatorSystemProducer = null;
+    try {
+      CoordinatorStreamSystemFactory coordinatorStreamSystemFactory  = new CoordinatorStreamSystemFactory();
+      Config coordinatorSystemConfig = getCoordinatorSystemConfig(jobInstance);
+      LOG.info("Using config: {} to create coordinatorStream producer and consumer.", coordinatorSystemConfig);
+      coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY);
+      coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, METRICS_REGISTRY);
+      LOG.info("Registering coordinator system stream consumer.");
+      coordinatorSystemConsumer.register();
+      LOG.debug("Starting coordinator system stream consumer.");
+      coordinatorSystemConsumer.start();
+      LOG.debug("Bootstrapping coordinator system stream consumer.");
+      coordinatorSystemConsumer.bootstrap();
+      LOG.info("Registering coordinator system stream producer.");
+      coordinatorSystemProducer.register(SOURCE);
+
+      Config config = coordinatorSystemConsumer.getConfig();
+      LOG.info("Got config from coordinatorSystemConsumer: {}.", config);
+      ChangelogPartitionManager changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, SOURCE);
+      changelogManager.start();
+      LocalityManager localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer);
+      localityManager.start();
+      return JobModelManager.readJobModel(config, changelogManager.readChangeLogPartitionMapping(), localityManager,
+                                          new StreamMetadataCache(JobModelManager.getSystemAdmins(config), 0, SystemClock.instance()), null);
+    } finally {
+      if (coordinatorSystemConsumer != null) {
+        coordinatorSystemConsumer.stop();
+      }
+      if (coordinatorSystemProducer != null) {
+        coordinatorSystemProducer.stop();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxyFactory.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxyFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxyFactory.java
new file mode 100644
index 0000000..7bcac7d
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxyFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.task;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.rest.proxy.installation.InstallationFinder;
+import org.apache.samza.rest.proxy.installation.SimpleInstallationFinder;
+import org.apache.samza.rest.resources.BaseResourceConfig;
+import org.apache.samza.util.ClassLoaderHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Creates the {@link TaskProxy} instances.
+ */
+public class SamzaTaskProxyFactory implements TaskProxyFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaTaskProxyFactory.class);
+
+  @Override
+  public TaskProxy getTaskProxy(TaskResourceConfig config) {
+    String installationsPath = config.getInstallationsPath();
+    Preconditions.checkArgument(StringUtils.isNotEmpty(installationsPath),
+                                String.format("Config param %s is not defined.", BaseResourceConfig.CONFIG_JOB_INSTALLATIONS_PATH));
+    String configFactoryClass = config.getJobConfigFactory();
+    try {
+      InstallationFinder installFinder = new SimpleInstallationFinder(installationsPath,
+                                                                      ClassLoaderHelper.fromClassName(configFactoryClass));
+      return new SamzaTaskProxy(config, installFinder);
+    } catch (Exception e) {
+      LOG.error(String.format("Exception during instantiation through configFactory class: %s.", configFactoryClass), e);
+      throw new SamzaException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxy.java
new file mode 100644
index 0000000..54da8c9
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.task;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.samza.rest.model.Task;
+import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.rest.proxy.job.JobProxy;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+
+
+/**
+ * TaskProxy is the primary abstraction that will be used by Rest API's to interact with tasks.
+ */
+public interface TaskProxy {
+
+  /**
+   * @param jobInstance the job instance to get the tasks for.
+   * @return a list of all the {@link Task} tasks that belongs to the {@link JobInstance}.
+   *         Each task will have a preferred host and stream partitions assigned to it by
+   *         the samza job coordinator.
+   * @throws IOException if there was a problem executing the command to get the tasks.
+   * @throws InterruptedException if the thread was interrupted while waiting for the result.
+   */
+  List<Task> getTasks(JobInstance jobInstance)
+      throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxyFactory.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxyFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxyFactory.java
new file mode 100644
index 0000000..5e9957f
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskProxyFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.task;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Factory interface that will be used to create {@link TaskProxy}
+ * instances.
+ *
+ * To use a custom {@link TaskProxy}, create an implementation of this interface
+ * and instantiate the custom proxy in the getTaskProxy method. Set
+ * the config {@link TaskResourceConfig#CONFIG_TASK_PROXY_FACTORY}
+ * value to the appropriate factory implementation class.
+ */
+public interface TaskProxyFactory {
+
+  /**
+   *
+   * @param config the {@link Config} to pass to the proxy.
+   * @return the created proxy.
+   */
+  TaskProxy getTaskProxy(TaskResourceConfig config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskResourceConfig.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskResourceConfig.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskResourceConfig.java
new file mode 100644
index 0000000..40cf706
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/TaskResourceConfig.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.rest.resources.BaseResourceConfig;
+import org.apache.samza.rest.resources.TasksResource;
+
+
+/**
+ * Configurations for the {@link TasksResource} endpoint.
+ */
+public class TaskResourceConfig extends BaseResourceConfig {
+
+  /**
+   * Specifies the canonical name of the {@link TaskProxyFactory} class to produce
+   * {@link TaskProxy} instances.
+   *
+   * To use your own proxy, implement the factory and specify the class for this config.
+   */
+  public static final String CONFIG_TASK_PROXY_FACTORY = "task.proxy.factory.class";
+
+  public TaskResourceConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * @see TaskResourceConfig#CONFIG_TASK_PROXY_FACTORY
+   * @return the canonical name of the {@link TaskProxyFactory} class to produce {@link TaskProxy} instances.
+   */
+  public String getTaskProxyFactory() {
+    return get(CONFIG_TASK_PROXY_FACTORY);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/resources/BaseResourceConfig.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/BaseResourceConfig.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/BaseResourceConfig.java
new file mode 100644
index 0000000..eca8fdc
--- /dev/null
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/BaseResourceConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.resources;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.factories.PropertiesConfigFactory;
+import org.apache.samza.rest.proxy.installation.InstallationRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class contains the common configurations that are
+ * shared between different samza-rest resources.
+ */
+public class BaseResourceConfig extends MapConfig {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseResourceConfig.class);
+
+  /**
+   * Specifies the canonical name of the {@link org.apache.samza.config.ConfigFactory} to read the job configs.
+   */
+  public static final String CONFIG_JOB_CONFIG_FACTORY = "job.config.factory.class";
+
+  /**
+   * The path where all the Samza jobs are installed (unzipped). Each subdirectory of this path
+   * is expected to be a Samza job installation and corresponds to one {@link InstallationRecord}.
+   */
+  public static final String CONFIG_JOB_INSTALLATIONS_PATH = "job.installations.path";
+
+
+  public BaseResourceConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * @see BaseResourceConfig#CONFIG_JOB_INSTALLATIONS_PATH
+   * @return the path where all the Samza jobs are installed (unzipped).
+   */
+  public String getInstallationsPath() {
+    return sanitizePath(get(CONFIG_JOB_INSTALLATIONS_PATH));
+  }
+
+  /**
+   * @see BaseResourceConfig#CONFIG_JOB_INSTALLATIONS_PATH
+   * @return the config factory class that has to be used to parse job configurations. If the config key
+   * {@link BaseResourceConfig#CONFIG_JOB_INSTALLATIONS_PATH} is not defined, then returns the {@link PropertiesConfigFactory} class name.
+   */
+  public String getJobConfigFactory() {
+    String configFactoryClassName = get(CONFIG_JOB_CONFIG_FACTORY);
+    if (configFactoryClassName == null) {
+      configFactoryClassName = PropertiesConfigFactory.class.getCanonicalName();
+      LOG.warn("{} not specified. Defaulting to {}", CONFIG_JOB_CONFIG_FACTORY, configFactoryClassName);
+    }
+    return configFactoryClassName;
+  }
+
+  /**
+   * Ensures a usable file path when the user specifies a tilde for the home path.
+   *
+   * @param rawPath the original path.
+   * @return        the updated path with the tilde resolved to home.
+   */
+  private static String sanitizePath(String rawPath) {
+    if (rawPath == null) {
+      return null;
+    }
+    return rawPath.replaceFirst("^~", System.getProperty("user.home"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
index e0224c6..0c41f55 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/DefaultResourceFactory.java
@@ -18,10 +18,10 @@
  */
 package org.apache.samza.rest.resources;
 
-import java.util.Collections;
+import com.google.common.collect.ImmutableList;
 import java.util.List;
 import org.apache.samza.config.Config;
-import org.apache.samza.rest.SamzaRestConfig;
+import org.apache.samza.rest.proxy.task.TaskResourceConfig;
 
 
 /**
@@ -30,6 +30,7 @@ import org.apache.samza.rest.SamzaRestConfig;
 public class DefaultResourceFactory implements ResourceFactory {
   @Override
   public List<? extends Object> getResourceInstances(Config config) {
-    return Collections.singletonList(new JobsResource(new JobsResourceConfig(config)));
+    return ImmutableList.of(new JobsResource(new JobsResourceConfig(config)),
+                            new TasksResource(new TaskResourceConfig(config)));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java
index a566db5..caad56c 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResource.java
@@ -37,7 +37,6 @@ import org.apache.samza.rest.proxy.job.JobProxyFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * The REST resource for jobs. Handles all requests for the jobs collection
  * or individual job instances.
@@ -73,7 +72,7 @@ public class JobsResource {
       return Response.ok(jobProxy.getAllJobStatuses()).build();
     } catch (Exception e) {
       log.error("Error in getInstalledJobs.", e);
-      return errorResponse(e.getMessage());
+      return Responses.errorResponse(e.getMessage());
     }
   }
 
@@ -105,7 +104,7 @@ public class JobsResource {
       return Response.ok(job).build();
     } catch (Exception e) {
       log.error("Error in getJob.", e);
-      return errorResponse(e.getMessage());
+      return Responses.errorResponse(e.getMessage());
     }
   }
 
@@ -155,21 +154,10 @@ public class JobsResource {
       }
     } catch (IllegalArgumentException e) {
       log.info(String.format("Illegal arguments updateJobStatus. JobName:%s JobId:%s Status=%s", jobName, jobId, status), e);
-      return Response.status(Response.Status.BAD_REQUEST).entity(
-          Collections.singletonMap("message", e.getMessage())).build();
+      return Responses.badRequestResponse(e.getMessage());
     } catch (Exception e) {
       log.error("Error in updateJobStatus.", e);
-      return errorResponse(String.format("Error type: %s message: %s", e.toString(), e.getMessage()));
+      return Responses.errorResponse(String.format("Error type: %s message: %s", e.toString(), e.getMessage()));
     }
   }
-
-  /**
-   * Constructs a consistent format for error responses. This method should be used for every error case.
-   *
-   * @param message the error message to report.
-   * @return        the {@link Response} containing the error message.
-   */
-  private Response errorResponse(String message) {
-    return Response.serverError().entity(Collections.singletonMap("message", message)).build();
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/26280cac/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
index 527482d..bd52e65 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/JobsResourceConfig.java
@@ -19,8 +19,6 @@
 package org.apache.samza.rest.resources;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.rest.proxy.installation.InstallationRecord;
 import org.apache.samza.rest.proxy.job.JobProxy;
 import org.apache.samza.rest.proxy.job.JobProxyFactory;
 
@@ -28,7 +26,8 @@ import org.apache.samza.rest.proxy.job.JobProxyFactory;
 /**
  * Configurations for the {@link JobsResource} endpoint.
  */
-public class JobsResourceConfig extends MapConfig {
+public class JobsResourceConfig extends BaseResourceConfig {
+
   /**
    * Specifies the canonical name of the {@link JobProxyFactory} class to produce
    * {@link JobProxy} instances.
@@ -37,48 +36,15 @@ public class JobsResourceConfig extends MapConfig {
    */
   public static final String CONFIG_JOB_PROXY_FACTORY = "job.proxy.factory.class";
 
-  /**
-   * The path where all the Samza jobs are installed (unzipped). Each subdirectory of this path
-   * is expected to be a Samza job installation and corresponds to one {@link InstallationRecord}.
-   */
-  public static final String CONFIG_JOB_INSTALLATIONS_PATH = "job.installations.path";
-
-  /**
-   * Specifies the canonical name of the {@link org.apache.samza.config.ConfigFactory} to read the job configs.
-   */
-  public static final String CONFIG_JOB_CONFIG_FACTORY = "job.config.factory.class";
-
   public JobsResourceConfig(Config config) {
     super(config);
   }
 
   /**
-   * @see JobsResourceConfig#CONFIG_JOB_CONFIG_FACTORY
+   * @see JobsResourceConfig#CONFIG_JOB_PROXY_FACTORY
    * @return the canonical name of the {@link JobProxyFactory} class to produce {@link JobProxy} instances.
    */
   public String getJobProxyFactory() {
     return get(CONFIG_JOB_PROXY_FACTORY);
   }
-
-  /**
-   * @see JobsResourceConfig#CONFIG_JOB_INSTALLATIONS_PATH
-   * @return the path where all the Samza jobs are installed (unzipped).
-   */
-  public String getInstallationsPath() {
-    return sanitizePath(get(CONFIG_JOB_INSTALLATIONS_PATH));
-  }
-
-  /**
-   * Ensures a usable file path when the user specifies a tilde for the home path.
-   *
-   * @param rawPath the original path.
-   * @return        the updated path with the tilde resolved to home.
-   */
-  private static String sanitizePath(String rawPath) {
-    if (rawPath == null) {
-      return null;
-    }
-    return rawPath.replaceFirst("^~", System.getProperty("user.home"));
-  }
-
 }