You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/05 18:57:04 UTC

[3/3] samza git commit: SAMZA-1973: Unify the TaskNameGrouper interface for yarn and standalone.

SAMZA-1973: Unify the TaskNameGrouper interface for yarn and standalone.

This patch consists of the following changes:
* Unify the different methods present in the TaskNameGrouper interface. This will enable us to have a single interface method usable for both the yarn and standalone models.
* Generate locationId aware task assignment to processors in standalone.
* Move the task assignment persistence logic from a custom `TaskNameGrouper` implementation to `JobModelManager`, so that this works for any kind of custom group.
* General code clean up in `JobModelManager`,  `TaskAssignmentManager` and in other samza internal classes.
* Read/write taskLocality of the processors in standalone.
* Updated the existing java docs and added java docs where they were missing.

Testing:
* Fixed the existing unit-tests due to the changes.
* Added new unit tests for the functionality changed added as a part of this patch.
* Tested this patch with a sample job from `hello-samza` project and verified that it works as expected.

Please refer to [SEP-11](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957309) for more details.

Author: Shanthoosh Venkataraman <sp...@usc.edu>
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Author: svenkata <sv...@linkedin.com>

Reviewers: Prateek M<pm...@linkedin.com>

Closes #790 from shanthoosh/task_name_grouper_changes


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5ea72584
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5ea72584
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5ea72584

Branch: refs/heads/master
Commit: 5ea72584f6b92937ec130f486d6f70603b7188c2
Parents: c7e5dcb
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Wed Dec 5 10:56:55 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Dec 5 10:56:55 2018 -0800

----------------------------------------------------------------------
 .../org/apache/samza/job/model/TaskModel.java   |   1 -
 .../samza/coordinator/AzureJobCoordinator.java  |   7 +-
 .../ClusterBasedJobCoordinator.java             |   3 +-
 .../samza/config/ClusterManagerConfig.java      |  14 +-
 .../grouper/task/BalancingTaskNameGrouper.java  |   5 +-
 .../grouper/task/GroupByContainerCount.java     | 228 ++++---------
 .../task/GroupByContainerCountFactory.java      |   3 +-
 .../grouper/task/GroupByContainerIds.java       | 171 ++++++++--
 .../container/grouper/task/GrouperMetadata.java |  58 ++++
 .../grouper/task/GrouperMetadataImpl.java       |  72 +++++
 .../grouper/task/TaskAssignmentManager.java     |   3 -
 .../samza/container/grouper/task/TaskGroup.java |  85 +++++
 .../container/grouper/task/TaskNameGrouper.java |  39 ++-
 .../grouper/task/TaskNameGrouperFactory.java    |   2 +-
 .../samza/execution/ExecutionPlanner.java       |   2 +-
 .../apache/samza/processor/StreamProcessor.java |   3 +-
 .../samza/runtime/LocalContainerRunner.java     |  18 +-
 .../standalone/PassthroughJobCoordinator.java   |  30 +-
 .../apache/samza/storage/StorageRecovery.java   |   5 +-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  74 ++++-
 .../main/java/org/apache/samza/zk/ZkUtils.java  |  27 ++
 .../apache/samza/container/SamzaContainer.scala |  12 +-
 .../samza/coordinator/JobModelManager.scala     | 231 +++++++++----
 .../samza/job/local/ProcessJobFactory.scala     |   2 +-
 .../samza/job/local/ThreadJobFactory.scala      |   2 +-
 .../grouper/task/TestGroupByContainerCount.java | 320 ++++++-------------
 .../grouper/task/TestGroupByContainerIds.java   | 292 +++++++++++++++--
 .../grouper/task/TestTaskAssignmentManager.java |   3 -
 .../samza/container/mock/ContainerMocks.java    |   6 +-
 .../coordinator/JobModelManagerTestUtil.java    |  17 +-
 .../samza/coordinator/TestJobModelManager.java  | 114 ++++++-
 .../java/org/apache/samza/zk/TestZkUtils.java   |  73 ++++-
 .../samza/container/TestSamzaContainer.scala    |  38 ++-
 .../apache/samza/test/framework/TestRunner.java |   2 +-
 .../processor/TestZkStreamProcessorBase.java    |   2 -
 .../processor/TestZkLocalApplicationRunner.java |  10 +-
 .../samza/validation/YarnJobValidationTool.java |   6 +-
 .../webapp/TestApplicationMasterRestClient.java |   4 +-
 38 files changed, 1348 insertions(+), 636 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
index 7ee7609..36917cf 100644
--- a/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
+++ b/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -99,7 +99,6 @@ public class TaskModel implements Comparable<TaskModel> {
   }
 
   @Override
-
   public String toString() {
     return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]";
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 96f628c..076ab54 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -30,6 +30,8 @@ import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
 import org.apache.samza.coordinator.data.BarrierState;
 import org.apache.samza.coordinator.data.ProcessorEntity;
 import org.apache.samza.coordinator.scheduler.HeartbeatScheduler;
@@ -54,7 +56,6 @@ import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -365,8 +366,8 @@ public class AzureJobCoordinator implements JobCoordinator {
     }
 
     // Generate the new JobModel
-    JobModel newJobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(),
-        null, streamMetadataCache, currentProcessorIds);
+    GrouperMetadata grouperMetadata = new GrouperMetadataImpl(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+    JobModel newJobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), streamMetadataCache, grouperMetadata);
     LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion);
 
     // Publish the new job model

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 4c5a34b..0eddbf2 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -185,8 +185,7 @@ public class ClusterBasedJobCoordinator {
 
     // build a JobModelManager and ChangelogStreamManager and perform partition assignments.
     changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
-    jobModelManager =
-        JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping());
+    jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping(), metrics);
 
     config = jobModelManager.jobModel().getConfig();
     hasDurableStores = new StorageConfig(config).hasDurableStores();

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index cb86a58..eda1be8 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -52,10 +52,14 @@ public class ClusterManagerConfig extends MapConfig {
   private static final int DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS = 5000;
 
   /**
-   * Flag to indicate if host-affinity is enabled for the job or not
+   * NOTE: This field is deprecated.
    */
   public static final String HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled";
-  public static final String CLUSTER_MANAGER_HOST_AFFINITY_ENABLED = "job.host-affinity.enabled";
+
+  /**
+   * Flag to indicate if host-affinity is enabled for the job or not
+   */
+  public static final String JOB_HOST_AFFINITY_ENABLED = "job.host-affinity.enabled";
 
   /**
    * Number of CPU cores to request from the cluster manager per container
@@ -145,10 +149,10 @@ public class ClusterManagerConfig extends MapConfig {
   }
 
   public boolean getHostAffinityEnabled() {
-    if (containsKey(CLUSTER_MANAGER_HOST_AFFINITY_ENABLED)) {
-      return getBoolean(CLUSTER_MANAGER_HOST_AFFINITY_ENABLED);
+    if (containsKey(JOB_HOST_AFFINITY_ENABLED)) {
+      return getBoolean(JOB_HOST_AFFINITY_ENABLED);
     } else if (containsKey(HOST_AFFINITY_ENABLED)) {
-      log.info("Configuration {} is deprecated. Please use {}", HOST_AFFINITY_ENABLED, CLUSTER_MANAGER_HOST_AFFINITY_ENABLED);
+      log.warn("Configuration {} is deprecated. Please use {}", HOST_AFFINITY_ENABLED, JOB_HOST_AFFINITY_ENABLED);
       return getBoolean(HOST_AFFINITY_ENABLED);
     } else {
       return false;

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
index f8295c8..91eab54 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
@@ -54,5 +54,8 @@ public interface BalancingTaskNameGrouper extends TaskNameGrouper {
    * @param localityManager provides a persisted task to container map to use as a baseline
    * @return                the grouped tasks in the form of ContainerModels
    */
-  Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
+  @Deprecated
+  default Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager) {
+    return group(tasks);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 759f82e..8a741db 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -27,19 +27,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.SamzaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Group the SSP taskNames by dividing the number of taskNames into the number
  * of containers (n) and assigning n taskNames to each container as returned by
@@ -51,19 +45,21 @@ import org.slf4j.LoggerFactory;
  * TODO: SAMZA-1197 - need to modify balance to work with processorId strings
  */
 public class GroupByContainerCount implements BalancingTaskNameGrouper {
-  private static final Logger log = LoggerFactory.getLogger(GroupByContainerCount.class);
+  private static final Logger LOG = LoggerFactory.getLogger(GroupByContainerCount.class);
   private final int containerCount;
-  private final Config config;
 
-  public GroupByContainerCount(Config config) {
-    this.containerCount = new JobConfig(config).getContainerCount();
-    this.config = config;
-    if (containerCount <= 0) throw new IllegalArgumentException("Must have at least one container");
+  public GroupByContainerCount(int containerCount) {
+    if (containerCount <= 0) {
+      throw new IllegalArgumentException("Must have at least one container");
+    }
+    this.containerCount = containerCount;
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public Set<ContainerModel> group(Set<TaskModel> tasks) {
-
     validateTasks(tasks);
 
     // Sort tasks by taskName.
@@ -89,79 +85,63 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
     return Collections.unmodifiableSet(containerModels);
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
-  public Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager) {
-
+  public Set<ContainerModel> group(Set<TaskModel> tasks, GrouperMetadata grouperMetadata) {
     validateTasks(tasks);
 
-    if (localityManager == null) {
-      log.info("Locality manager is null. Cannot read or write task assignments. Invoking grouper.");
+    List<TaskGroup> containers = getPreviousContainers(grouperMetadata, tasks.size());
+    if (containers == null || containers.size() == 1 || containerCount == 1) {
+      LOG.info("Balancing does not apply. Invoking grouper.");
       return group(tasks);
     }
 
-    TaskAssignmentManager taskAssignmentManager =  new TaskAssignmentManager(config, new MetricsRegistryMap());
-    taskAssignmentManager.init();
-    try {
-      List<TaskGroup> containers = getPreviousContainers(taskAssignmentManager, tasks.size());
-      if (containers == null || containers.size() == 1 || containerCount == 1) {
-        log.info("Balancing does not apply. Invoking grouper.");
-        Set<ContainerModel> models = group(tasks);
-        saveTaskAssignments(models, taskAssignmentManager);
-        return models;
-      }
-
-      int prevContainerCount = containers.size();
-      int containerDelta = containerCount - prevContainerCount;
-      if (containerDelta == 0) {
-        log.info("Container count has not changed. Reusing previous container models.");
-        return buildContainerModels(tasks, containers);
-      }
-      log.info("Container count changed from {} to {}. Balancing tasks.", prevContainerCount, containerCount);
-
-      // Calculate the expected task count per container
-      int[] expectedTaskCountPerContainer = calculateTaskCountPerContainer(tasks.size(), prevContainerCount, containerCount);
+    int prevContainerCount = containers.size();
+    int containerDelta = containerCount - prevContainerCount;
+    if (containerDelta == 0) {
+      LOG.info("Container count has not changed. Reusing previous container models.");
+      return TaskGroup.buildContainerModels(tasks, containers);
+    }
+    LOG.info("Container count changed from {} to {}. Balancing tasks.", prevContainerCount, containerCount);
 
-      // Collect excess tasks from over-assigned containers
-      List<String> taskNamesToReassign = new LinkedList<>();
-      for (int i = 0; i < prevContainerCount; i++) {
-        TaskGroup taskGroup = containers.get(i);
-        while (taskGroup.size() > expectedTaskCountPerContainer[i]) {
-          taskNamesToReassign.add(taskGroup.removeTask());
-        }
-      }
+    // Calculate the expected task count per container
+    int[] expectedTaskCountPerContainer = calculateTaskCountPerContainer(tasks.size(), prevContainerCount, containerCount);
 
-      // Assign tasks to the under-assigned containers
-      if (containerDelta > 0) {
-        List<TaskGroup> newContainers = createContainers(prevContainerCount, containerCount);
-        containers.addAll(newContainers);
-      } else {
-        containers = containers.subList(0, containerCount);
+    // Collect excess tasks from over-assigned containers
+    List<String> taskNamesToReassign = new LinkedList<>();
+    for (int i = 0; i < prevContainerCount; i++) {
+      TaskGroup taskGroup = containers.get(i);
+      while (taskGroup.size() > expectedTaskCountPerContainer[i]) {
+        taskNamesToReassign.add(taskGroup.removeLastTaskName());
       }
-      assignTasksToContainers(expectedTaskCountPerContainer, taskNamesToReassign, containers);
+    }
 
-      // Transform containers to containerModel
-      Set<ContainerModel> models = buildContainerModels(tasks, containers);
+    // Assign tasks to the under-assigned containers
+    if (containerDelta > 0) {
+      List<TaskGroup> newContainers = createContainers(prevContainerCount, containerCount);
+      containers.addAll(newContainers);
+    } else {
+      containers = containers.subList(0, containerCount);
+    }
 
-      // Save the results
-      saveTaskAssignments(models, taskAssignmentManager);
+    assignTasksToContainers(expectedTaskCountPerContainer, taskNamesToReassign, containers);
 
-      return models;
-    } finally {
-      taskAssignmentManager.close();
-    }
+    return TaskGroup.buildContainerModels(tasks, containers);
   }
 
   /**
-   * Reads the task-container mapping from the provided {@link TaskAssignmentManager} and returns a
+   * Reads the task-container mapping from the provided {@link GrouperMetadata} and returns a
    * list of TaskGroups, ordered ascending by containerId.
    *
-   * @param taskAssignmentManager the {@link TaskAssignmentManager} that will be used to retrieve the previous mapping.
-   * @param taskCount             the number of tasks, for validation against the persisted tasks.
-   * @return                      a list of TaskGroups, ordered ascending by containerId or {@code null}
-   *                              if the previous mapping doesn't exist or isn't usable.
+   * @param grouperMetadata  the {@link GrouperMetadata} will be used to retrieve the previous task to container assignments.
+   * @param taskCount       the number of tasks, for validation against the persisted tasks.
+   * @return                a list of TaskGroups, ordered ascending by containerId or {@code null}
+   *                        if the previous mapping doesn't exist or isn't usable.
    */
-  private List<TaskGroup> getPreviousContainers(TaskAssignmentManager taskAssignmentManager, int taskCount) {
-    Map<String, String> taskToContainerId = taskAssignmentManager.readTaskAssignment();
+  private List<TaskGroup> getPreviousContainers(GrouperMetadata grouperMetadata, int taskCount) {
+    Map<TaskName, String> taskToContainerId = grouperMetadata.getPreviousTaskToProcessorAssignment();
     taskToContainerId.values().forEach(id -> {
         try {
           int intId = Integer.parseInt(id);
@@ -169,19 +149,11 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
           throw new SamzaException("GroupByContainerCount cannot handle non-integer processorIds!", nfe);
         }
       });
+
     if (taskToContainerId.isEmpty()) {
-      log.info("No task assignment map was saved.");
+      LOG.info("No task assignment map was saved.");
       return null;
     } else if (taskCount != taskToContainerId.size()) {
-      log.warn(
-          "Current task count {} does not match saved task count {}. Stateful jobs may observe misalignment of keys!",
-          taskCount, taskToContainerId.size());
-      // If the tasks changed, then the partition-task grouping is also likely changed and we can't handle that
-      // without a much more complicated mapping. Further, the partition count may have changed, which means
-      // input message keys are likely reshuffled w.r.t. partitions, so the local state may not contain necessary
-      // data associated with the incoming keys. Warn the user and default to grouper
-      // In this scenario the tasks may have been reduced, so we need to delete all the existing messages
-      taskAssignmentManager.deleteTaskContainerMappings(taskToContainerId.keySet());
       return null;
     }
 
@@ -189,27 +161,13 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
     try {
       containers = getOrderedContainers(taskToContainerId);
     } catch (Exception e) {
-      log.error("Exception while parsing task mapping", e);
+      LOG.error("Exception while parsing task mapping", e);
       return null;
     }
     return containers;
   }
 
   /**
-   * Saves the task assignments specified by containers using the provided TaskAssignementManager.
-   *
-   * @param containers            the set of containers from which the task assignments will be saved.
-   * @param taskAssignmentManager the {@link TaskAssignmentManager} that will be used to save the mappings.
-   */
-  private void saveTaskAssignments(Set<ContainerModel> containers, TaskAssignmentManager taskAssignmentManager) {
-    for (ContainerModel container : containers) {
-      for (TaskName taskName : container.getTasks().keySet()) {
-        taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getId());
-      }
-    }
-  }
-
-  /**
    * Verifies the input tasks argument and throws {@link IllegalArgumentException} if it is invalid.
    *
    * @param tasks the tasks to validate.
@@ -252,13 +210,12 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
    * @param containers            the containers (as {@link TaskGroup}) to which the tasks will be assigned.
    */
   // TODO: Change logic from using int arrays to a Map<String, Integer> (id -> taskCount)
-  private void assignTasksToContainers(int[] taskCountPerContainer, List<String> taskNamesToAssign,
-      List<TaskGroup> containers) {
+  private void assignTasksToContainers(int[] taskCountPerContainer, List<String> taskNamesToAssign, List<TaskGroup> containers) {
     for (TaskGroup taskGroup : containers) {
       for (int j = taskGroup.size(); j < taskCountPerContainer[Integer.valueOf(taskGroup.getContainerId())]; j++) {
         String taskName = taskNamesToAssign.remove(0);
         taskGroup.addTaskName(taskName);
-        log.info("Assigned task {} to container {}", taskName, taskGroup.getContainerId());
+        LOG.info("Assigned task {} to container {}", taskName, taskGroup.getContainerId());
       }
     }
   }
@@ -288,53 +245,20 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
   }
 
   /**
-   * Translates the list of TaskGroup instances to a set of ContainerModel instances, using the
-   * set of TaskModel instances.
-   *
-   * @param tasks             the TaskModels to assign to the ContainerModels.
-   * @param containerTasks    the TaskGroups defining how the tasks should be grouped.
-   * @return                  a mutable set of ContainerModels.
-   */
-  private Set<ContainerModel> buildContainerModels(Set<TaskModel> tasks, List<TaskGroup> containerTasks) {
-    // Map task names to models
-    Map<String, TaskModel> taskNameToModel = new HashMap<>();
-    for (TaskModel model : tasks) {
-      taskNameToModel.put(model.getTaskName().getTaskName(), model);
-    }
-
-    // Build container models
-    Set<ContainerModel> containerModels = new HashSet<>();
-    for (TaskGroup container : containerTasks) {
-      Map<TaskName, TaskModel> containerTaskModels = new HashMap<>();
-      for (String taskName : container.taskNames) {
-        TaskModel model = taskNameToModel.get(taskName);
-        containerTaskModels.put(model.getTaskName(), model);
-      }
-      containerModels.add(
-          new ContainerModel(container.containerId, containerTaskModels));
-    }
-    return Collections.unmodifiableSet(containerModels);
-  }
-
-  /**
    * Converts the task->containerId map to an ordered list of {@link TaskGroup} instances.
    *
    * @param taskToContainerId a map from each task name to the containerId to which it is assigned.
    * @return                  a list of TaskGroups ordered ascending by containerId.
    */
-  private List<TaskGroup> getOrderedContainers(Map<String, String> taskToContainerId) {
-    log.debug("Got task to container map: {}", taskToContainerId);
+  private List<TaskGroup> getOrderedContainers(Map<TaskName, String> taskToContainerId) {
+    LOG.debug("Got task to container map: {}", taskToContainerId);
 
     // Group tasks by container Id
-    HashMap<String, List<String>> containerIdToTaskNames = new HashMap<>();
-    for (Map.Entry<String, String> entry : taskToContainerId.entrySet()) {
-      String taskName = entry.getKey();
+    Map<String, List<String>> containerIdToTaskNames = new HashMap<>();
+    for (Map.Entry<TaskName, String> entry : taskToContainerId.entrySet()) {
+      String taskName = entry.getKey().getTaskName();
       String containerId = entry.getValue();
-      List<String> taskNames = containerIdToTaskNames.get(containerId);
-      if (taskNames == null) {
-        taskNames = new ArrayList<>();
-        containerIdToTaskNames.put(containerId, taskNames);
-      }
+      List<String> taskNames = containerIdToTaskNames.computeIfAbsent(containerId, k -> new ArrayList<>());
       taskNames.add(taskName);
     }
 
@@ -347,36 +271,4 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
 
     return containerTasks;
   }
-
-  /**
-   * A mutable group of tasks and an associated container id.
-   *
-   * Used as a temporary mutable container until the final ContainerModel is known.
-   */
-  private static class TaskGroup {
-    private final List<String> taskNames = new LinkedList<>();
-    private final String containerId;
-
-    private TaskGroup(String containerId, List<String> taskNames) {
-      this.containerId = containerId;
-      Collections.sort(taskNames);        // For consistency because the taskNames came from a Map
-      this.taskNames.addAll(taskNames);
-    }
-
-    public String getContainerId() {
-      return containerId;
-    }
-
-    public void addTaskName(String taskName) {
-      taskNames.add(taskName);
-    }
-
-    public String removeTask() {
-      return taskNames.remove(taskNames.size() - 1);
-    }
-
-    public int size() {
-      return taskNames.size();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java
index 06aba33..5acf5b8 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java
@@ -19,6 +19,7 @@
 package org.apache.samza.container.grouper.task;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 
 /**
  * Factory to build the GroupByContainerCount class.
@@ -26,6 +27,6 @@ import org.apache.samza.config.Config;
 public class GroupByContainerCountFactory implements TaskNameGrouperFactory {
   @Override
   public TaskNameGrouper build(Config config) {
-    return new GroupByContainerCount(config);
+    return new GroupByContainerCount(new JobConfig(config).getContainerCount());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
index 9dab943..7c11da4 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
@@ -19,27 +19,34 @@
 
 package org.apache.samza.container.grouper.task;
 
-import java.util.Arrays;
-import java.util.stream.Collectors;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.TaskModel;
-
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.runtime.LocationId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * Simple grouper.
- * It exposes two group methods - one that assumes sequential container numbers and one that gets a set of container
- * IDs as an argument. Please note - this first implementation ignores locality information.
+ * A {@link TaskNameGrouper} implementation that provides two different grouping strategies:
+ *
+ * - One that assigns the tasks to the available containerIds in a round robin fashion.
+ * - The other that generates a equidistributed and locality-aware task to container assignment.
  */
 public class GroupByContainerIds implements TaskNameGrouper {
   private static final Logger LOG = LoggerFactory.getLogger(GroupByContainerIds.class);
@@ -49,6 +56,9 @@ public class GroupByContainerIds implements TaskNameGrouper {
     this.startContainerCount = count;
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public Set<ContainerModel> group(Set<TaskModel> tasks) {
     List<String> containerIds = new ArrayList<>(startContainerCount);
@@ -58,30 +68,40 @@ public class GroupByContainerIds implements TaskNameGrouper {
     return group(tasks, containerIds);
   }
 
-  public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
-    if (containersIds == null)
+  /**
+   * {@inheritDoc}
+   *
+   * When number of taskModels are less than number of available containerIds,
+   * then chooses then selects the lexicographically least `x` containerIds.
+   *
+   * Otherwise, assigns the tasks to the available containerIds in a round robin fashion
+   * preserving the containerId in the final assignment.
+   */
+  @Override
+  public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containerIds) {
+    if (containerIds == null)
       return this.group(tasks);
 
-    if (containersIds.isEmpty())
+    if (containerIds.isEmpty())
       throw new IllegalArgumentException("Must have at least one container");
 
     if (tasks.isEmpty())
-      throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays
-          .toString(containersIds.toArray()));
+      throw new IllegalArgumentException("cannot group an empty set. containerIds=" + Arrays
+          .toString(containerIds.toArray()));
 
-    if (containersIds.size() > tasks.size()) {
-      LOG.warn("Number of containers: {} is greater than number of tasks: {}.",  containersIds.size(), tasks.size());
+    if (containerIds.size() > tasks.size()) {
+      LOG.warn("Number of containers: {} is greater than number of tasks: {}.",  containerIds.size(), tasks.size());
       /**
        * Choose lexicographically least `x` containerIds(where x = tasks.size()).
        */
-      containersIds = containersIds.stream()
+      containerIds = containerIds.stream()
                                    .sorted()
                                    .limit(tasks.size())
                                    .collect(Collectors.toList());
-      LOG.info("Generating containerModel with containers: {}.", containersIds);
+      LOG.info("Generating containerModel with containers: {}.", containerIds);
     }
 
-    int containerCount = containersIds.size();
+    int containerCount = containerIds.size();
 
     // Sort tasks by taskName.
     List<TaskModel> sortedTasks = new ArrayList<>(tasks);
@@ -100,9 +120,118 @@ public class GroupByContainerIds implements TaskNameGrouper {
     // Convert to a Set of ContainerModel
     Set<ContainerModel> containerModels = new HashSet<>();
     for (int i = 0; i < containerCount; i++) {
-      containerModels.add(new ContainerModel(containersIds.get(i), taskGroups[i]));
+      containerModels.add(new ContainerModel(containerIds.get(i), taskGroups[i]));
     }
 
     return Collections.unmodifiableSet(containerModels);
   }
+
+  /**
+   * {@inheritDoc}
+   *
+   * When the are `t` tasks and `p` processors, where t &lt;= p, a fair task distribution should ideally assign
+   * (t / p) tasks to each processor. In addition to guaranteeing a fair distribution, this {@link TaskNameGrouper}
+   * implementation generates a locationId aware task assignment to processors where it makes best efforts in assigning
+   * the tasks to processors with the same locality.
+   *
+   * Task assignment to processors is accomplished through the following two phases:
+   *
+   * 1. In the first phase, each task(T) is assigned to a processor(P) that satisfies the following constraints:
+   *    A. The processor(P) should have the same locality of the task(T).
+   *    B. Number of tasks already assigned to the processor should be less than the (number of tasks / number of processors).
+   *
+   * 2. Each unassigned task from phase 1 are then mapped to any processor with task count less than the
+   * (number of tasks / number of processors). When no such processor exists, then the unassigned
+   * task is mapped to any processor from available processors in a round robin fashion.
+   */
+  @Override
+  public Set<ContainerModel> group(Set<TaskModel> taskModels, GrouperMetadata grouperMetadata) {
+    // Validate that the task models are not empty.
+    Map<TaskName, LocationId> taskLocality = grouperMetadata.getTaskLocality();
+    Preconditions.checkArgument(!taskModels.isEmpty(), "No tasks found. Likely due to no input partitions. Can't run a job with no tasks.");
+
+    // Invoke the default grouper when the processor locality does not exist.
+    if (MapUtils.isEmpty(grouperMetadata.getProcessorLocality())) {
+      LOG.info("ProcessorLocality is empty. Generating with the default group method.");
+      return group(taskModels, new ArrayList<>());
+    }
+
+    Map<String, LocationId> processorLocality = new TreeMap<>(grouperMetadata.getProcessorLocality());
+    /**
+     * When there're more task models than processors then choose the lexicographically least `x` processors(where x = tasks.size()).
+     */
+    if (processorLocality.size() > taskModels.size()) {
+      processorLocality = processorLocality.entrySet()
+                                           .stream()
+                                           .limit(taskModels.size())
+                                           .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    Map<LocationId, List<String>> locationIdToProcessors = new HashMap<>();
+    Map<String, TaskGroup> processorIdToTaskGroup = new HashMap<>();
+
+    // Generate the {@see LocationId} to processors mapping and processorId to {@see TaskGroup} mapping.
+    processorLocality.forEach((processorId, locationId) -> {
+        List<String> processorIds = locationIdToProcessors.getOrDefault(locationId, new ArrayList<>());
+        processorIds.add(processorId);
+        locationIdToProcessors.put(locationId, processorIds);
+        processorIdToTaskGroup.put(processorId, new TaskGroup(processorId, new ArrayList<>()));
+      });
+
+    int numTasksPerProcessor = taskModels.size() / processorLocality.size();
+    Set<TaskName> assignedTasks = new HashSet<>();
+
+    /**
+     * A processor is considered under-assigned when number of tasks assigned to it is less than
+     * (number of tasks / number of processors).
+     * Map the tasks to the under-assigned processors with same locality.
+     */
+    for (TaskModel taskModel : taskModels) {
+      LocationId taskLocationId = taskLocality.get(taskModel.getTaskName());
+      if (taskLocationId != null) {
+        List<String> processorIds = locationIdToProcessors.getOrDefault(taskLocationId, new ArrayList<>());
+        for (String processorId : processorIds) {
+          TaskGroup taskGroup = processorIdToTaskGroup.get(processorId);
+          if (taskGroup.size() < numTasksPerProcessor) {
+            taskGroup.addTaskName(taskModel.getTaskName().getTaskName());
+            assignedTasks.add(taskModel.getTaskName());
+            break;
+          }
+        }
+      }
+    }
+
+    /**
+     * In some scenarios, the task either might not have any previous locality or might not have any
+     * processor that maps to its previous locality. This cyclic processorId's iterator helps us in
+     * those scenarios to assign the processorIds to those kind of tasks in a round robin fashion.
+     */
+    Iterator<String> processorIdsCyclicIterator = Iterators.cycle(processorLocality.keySet());
+
+    // Order the taskGroups to choose a task group in a deterministic fashion for unassigned tasks.
+    List<TaskGroup> taskGroups = new ArrayList<>(processorIdToTaskGroup.values());
+    taskGroups.sort(Comparator.comparing(TaskGroup::getContainerId));
+
+    /**
+     * For the tasks left over from the previous stage, map them to any under-assigned processor.
+     * When a under-assigned processor doesn't exist, then map them to any processor from the
+     * available processors in a round robin manner.
+     */
+    for (TaskModel taskModel : taskModels) {
+      if (!assignedTasks.contains(taskModel.getTaskName())) {
+        Optional<TaskGroup> underAssignedTaskGroup = taskGroups.stream()
+                .filter(taskGroup -> taskGroup.size() < numTasksPerProcessor)
+                .findFirst();
+        if (underAssignedTaskGroup.isPresent()) {
+          underAssignedTaskGroup.get().addTaskName(taskModel.getTaskName().getTaskName());
+        } else {
+          TaskGroup taskGroup = processorIdToTaskGroup.get(processorIdsCyclicIterator.next());
+          taskGroup.addTaskName(taskModel.getTaskName().getTaskName());
+        }
+        assignedTasks.add(taskModel.getTaskName());
+      }
+    }
+
+    return TaskGroup.buildContainerModels(taskModels, taskGroups);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadata.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadata.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadata.java
new file mode 100644
index 0000000..ef919d1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadata.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.system.SystemStreamPartition;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides the historical metadata of the samza application.
+ */
+@InterfaceStability.Evolving
+public interface GrouperMetadata {
+
+  /**
+   * Gets the current processor locality of the job.
+   * @return the processorId to the {@link LocationId} assignment.
+   */
+  Map<String, LocationId> getProcessorLocality();
+
+  /**
+   * Gets the current task locality of the job.
+   * @return the current {@link TaskName} to {@link LocationId} assignment.
+   */
+  Map<TaskName, LocationId> getTaskLocality();
+
+  /**
+   * Gets the previous {@link TaskName} to {@link SystemStreamPartition} assignment of the job.
+   * @return the previous {@link TaskName} to {@link SystemStreamPartition} assignment.
+   */
+  Map<TaskName, List<SystemStreamPartition>> getPreviousTaskToSSPAssignment();
+
+
+  /**
+   * Gets the previous {@link TaskName} to processorId assignments of the job.
+   * @return the previous task to processorId assignment.
+   */
+  Map<TaskName, String> getPreviousTaskToProcessorAssignment();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadataImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadataImpl.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadataImpl.java
new file mode 100644
index 0000000..bc40bc4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadataImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import org.apache.samza.container.TaskName;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.system.SystemStreamPartition;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link GrouperMetadata} that holds the necessary historical metadata of
+ * the samza job. This is used by the {@link TaskNameGrouper} to generate optimal task assignments.
+ */
+public class GrouperMetadataImpl implements GrouperMetadata {
+
+  // Map of processorId to LocationId.
+  private final Map<String, LocationId> processorLocality;
+
+  // Map of TaskName to LocationId.
+  private final Map<TaskName, LocationId> taskLocality;
+
+  // Map of TaskName to a list of the input SystemStreamPartition's assigned to it.
+  private final Map<TaskName, List<SystemStreamPartition>> previousTaskToSSPAssignment;
+
+  // Map of TaskName to ProcessorId.
+  private final Map<TaskName, String> previousTaskToProcessorAssignment;
+
+  public GrouperMetadataImpl(Map<String, LocationId> processorLocality, Map<TaskName, LocationId> taskLocality, Map<TaskName, List<SystemStreamPartition>> previousTaskToSSPAssignments, Map<TaskName, String> previousTaskToProcessorAssignment) {
+    this.processorLocality = Collections.unmodifiableMap(processorLocality);
+    this.taskLocality = Collections.unmodifiableMap(taskLocality);
+    this.previousTaskToSSPAssignment = Collections.unmodifiableMap(previousTaskToSSPAssignments);
+    this.previousTaskToProcessorAssignment = Collections.unmodifiableMap(previousTaskToProcessorAssignment);
+  }
+
+  @Override
+  public Map<String, LocationId> getProcessorLocality() {
+    return processorLocality;
+  }
+
+  @Override
+  public Map<TaskName, LocationId> getTaskLocality() {
+    return taskLocality;
+  }
+
+  @Override
+  public Map<TaskName, List<SystemStreamPartition>> getPreviousTaskToSSPAssignment() {
+    return previousTaskToSSPAssignment;
+  }
+
+  @Override
+  public Map<TaskName, String> getPreviousTaskToProcessorAssignment() {
+    return this.previousTaskToProcessorAssignment;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 32bbf29..b6e946c 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -81,9 +81,6 @@ public class TaskAssignmentManager {
     this.valueSerde = valueSerde;
     MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
     this.metadataStore = metadataStoreFactory.getMetadataStore(SetTaskContainerMapping.TYPE, config, metricsRegistry);
-  }
-
-  public void init() {
     this.metadataStore.init();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskGroup.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskGroup.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskGroup.java
new file mode 100644
index 0000000..1fe0f40
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskGroup.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+
+import java.util.*;
+
+/**
+ * A mutable group of tasks and an associated container id.
+ *
+ * Used as a temporary mutable container until the final ContainerModel is known.
+ */
+class TaskGroup {
+  private final List<String> taskNames = new LinkedList<>();
+  private final String containerId;
+
+  TaskGroup(String containerId, List<String> taskNames) {
+    this.containerId = containerId;
+    this.taskNames.addAll(taskNames);
+    Collections.sort(this.taskNames); // For consistency because the taskNames came from a Map
+  }
+
+  public String getContainerId() {
+    return containerId;
+  }
+
+  public void addTaskName(String taskName) {
+    taskNames.add(taskName);
+  }
+
+  public String removeLastTaskName() {
+    return taskNames.remove(taskNames.size() - 1);
+  }
+
+  public int size() {
+    return taskNames.size();
+  }
+
+  /**
+   * Converts the {@link TaskGroup} list to a set of ContainerModel.
+   *
+   * @param taskModels    the TaskModels to assign to the ContainerModels.
+   * @param taskGroups    the TaskGroups defining how the tasks should be grouped.
+   * @return              a set of ContainerModels.
+   */
+  public static Set<ContainerModel> buildContainerModels(Set<TaskModel> taskModels, Collection<TaskGroup> taskGroups) {
+    // Map task names to models
+    Map<String, TaskModel> taskNameToModel = new HashMap<>();
+    for (TaskModel model : taskModels) {
+      taskNameToModel.put(model.getTaskName().getTaskName(), model);
+    }
+
+    // Build container models
+    Set<ContainerModel> containerModels = new HashSet<>();
+    for (TaskGroup container : taskGroups) {
+      Map<TaskName, TaskModel> containerTaskModels = new HashMap<>();
+      for (String taskName : container.taskNames) {
+        TaskModel model = taskNameToModel.get(taskName);
+        containerTaskModels.put(model.getTaskName(), model);
+      }
+      containerModels.add(new ContainerModel(container.containerId, containerTaskModels));
+    }
+
+    return Collections.unmodifiableSet(containerModels);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
index 71b80cc..2124dfc 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
@@ -18,10 +18,9 @@
  */
 package org.apache.samza.container.grouper.task;
 
-import java.util.List;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
-
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -44,15 +43,39 @@ import java.util.Set;
  * </p>
  */
 public interface TaskNameGrouper {
+
   /**
-   * Group tasks into the containers they will share.
+   * Groups the taskModels into set of {@link ContainerModel} using the metadata of
+   * the job from {@link GrouperMetadata}.
    *
-   * @param tasks Set of tasks to group into containers.
-   * @return Set of containers, which contain the tasks that were passed in.
+   * @param taskModels the set of tasks to group into containers.
+   * @param grouperMetadata provides the historical metadata of the samza job.
+   * @return the grouped {@link ContainerModel} built from the provided taskModels.
    */
-  Set<ContainerModel> group(Set<TaskModel> tasks);
+  default Set<ContainerModel> group(Set<TaskModel> taskModels, GrouperMetadata grouperMetadata) {
+    return group(taskModels);
+  }
 
-  default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
-    return group(tasks);
+  /**
+   * Group the taskModels into set of {@link ContainerModel}.
+   *
+   * @param taskModels the set of {@link TaskModel} to group into containers.
+   * @return the grouped {@link ContainerModel} built from the provided taskModels.
+   */
+  @Deprecated
+  default Set<ContainerModel> group(Set<TaskModel> taskModels) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Group the taskModels into set of {@link ContainerModel}.
+   *
+   * @param taskModels the set of {@link TaskModel} to group into containers.
+   * @param containersIds the list of container ids that has to be used in the {@link ContainerModel}.
+   * @return the grouped {@link ContainerModel} built from the provided taskModels.
+   */
+  @Deprecated
+  default Set<ContainerModel> group(Set<TaskModel> taskModels, List<String> containersIds) {
+    return group(taskModels);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
index 8b967b7..37684f4 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
@@ -28,7 +28,7 @@ public interface TaskNameGrouperFactory {
    * Builds a {@link TaskNameGrouper}. The config can be used to read the necessary values which are needed int the
    * process of building the {@link TaskNameGrouper}
    *
-   * @param config configuration to which values can be used to build a {@link TaskNameGrouper}
+   * @param config configuration to use for building the {@link TaskNameGrouper}
    * @return a {@link TaskNameGrouper} implementation
    */
   TaskNameGrouper build(Config config);

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 0110551..0c5e368 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -104,7 +104,7 @@ public class ExecutionPlanner {
     // currently we don't support host-affinity in batch mode
     if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH && clusterConfig.getHostAffinityEnabled()) {
       throw new SamzaException(String.format("Host affinity is not supported in batch mode. Please configure %s=false.",
-          ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED));
+          ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 7328bc7..34d67cc 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -317,7 +317,8 @@ public class StreamProcessor {
     return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
         this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
         Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
-        Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)));
+        Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
+        null);
   }
 
   private JobCoordinator createJobCoordinator() {

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index c5c0d78..a5a45ba 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -22,6 +22,14 @@ package org.apache.samza.runtime;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+
+import org.apache.samza.container.ContainerHeartbeatClient;
+import org.apache.samza.container.ContainerHeartbeatMonitor;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.container.SamzaContainerListener;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.MDC;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
@@ -31,11 +39,6 @@ import org.apache.samza.application.ApplicationUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.container.ContainerHeartbeatClient;
-import org.apache.samza.container.ContainerHeartbeatMonitor;
-import org.apache.samza.container.SamzaContainer;
-import org.apache.samza.container.SamzaContainer$;
-import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsReporter;
@@ -47,7 +50,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 
-
 /**
  * Launches and manages the lifecycle for {@link SamzaContainer}s in YARN.
  */
@@ -93,6 +95,7 @@ public class LocalContainerRunner {
   private static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId,
       JobModel jobModel, Config config) {
     TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
+    LocalityManager localityManager = new LocalityManager(config, new MetricsRegistryMap());
     SamzaContainer container = SamzaContainer$.MODULE$.apply(
         containerId,
         jobModel,
@@ -100,7 +103,8 @@ public class LocalContainerRunner {
         taskFactory,
         JobContextImpl.fromConfigWithDefaults(config),
         Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
-        Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)));
+        Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
+        localityManager);
 
     ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
         .createInstance(new ProcessorContext() { }, config);

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 737ac3e..44fd811 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -18,16 +18,22 @@
  */
 package org.apache.samza.standalone;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.checkpoint.CheckpointManager;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.runtime.LocationIdProvider;
+import org.apache.samza.runtime.LocationIdProviderFactory;
 import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
@@ -35,7 +41,6 @@ import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Collections;
 
 /**
@@ -65,11 +70,15 @@ public class PassthroughJobCoordinator implements JobCoordinator {
   private static final Logger LOGGER = LoggerFactory.getLogger(PassthroughJobCoordinator.class);
   private final String processorId;
   private final Config config;
+  private final LocationId locationId;
   private JobCoordinatorListener coordinatorListener = null;
 
   public PassthroughJobCoordinator(Config config) {
     this.processorId = createProcessorId(config);
     this.config = config;
+    LocationIdProviderFactory locationIdProviderFactory = Util.getObj(new JobConfig(config).getLocationIdProviderFactory(), LocationIdProviderFactory.class);
+    LocationIdProvider locationIdProvider = locationIdProviderFactory.getLocationIdProvider(config);
+    this.locationId = locationIdProvider.getLocationId();
   }
 
   @Override
@@ -119,18 +128,13 @@ public class PassthroughJobCoordinator implements JobCoordinator {
     SystemAdmins systemAdmins = new SystemAdmins(config);
     StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, SystemClock.instance());
     systemAdmins.start();
-    String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID()));
-
-    /** TODO:
-     Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also,
-     in SamzaContainer for writing locality info to the coordinator stream. This closely couples together
-     TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
-     (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
-     */
-    JobModel jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
-        Collections.singletonList(containerId));
-    systemAdmins.stop();
-    return jobModel;
+    try {
+      String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID()));
+      GrouperMetadata grouperMetadata = new GrouperMetadataImpl(ImmutableMap.of(String.valueOf(containerId), locationId), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+      return JobModelManager.readJobModel(this.config, Collections.emptyMap(), streamMetadataCache, grouperMetadata);
+    } finally {
+      systemAdmins.stop();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 64ae310..5442d6e 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -125,12 +125,13 @@ public class StorageRecovery extends CommandLine {
    * map
    */
   private void getContainerModels() {
-    CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(jobConfig, new MetricsRegistryMap());
+    MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
+    CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(jobConfig, metricsRegistryMap);
     coordinatorStreamManager.register(getClass().getSimpleName());
     coordinatorStreamManager.start();
     coordinatorStreamManager.bootstrap();
     ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
-    JobModel jobModel = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()).jobModel();
+    JobModel jobModel = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping(), metricsRegistryMap).jobModel();
     containers = jobModel.getContainers();
     coordinatorStreamManager.stop();
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 81c0465..8c5a3ba 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.samza.zk;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.HashMap;
 import java.util.List;
@@ -40,6 +41,8 @@ import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.JobModelManager;
@@ -47,17 +50,23 @@ import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.runtime.LocationIdProvider;
+import org.apache.samza.runtime.LocationIdProviderFactory;
 import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.Util;
+import org.apache.samza.zk.ZkUtils.ProcessorNode;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,6 +107,7 @@ public class ZkJobCoordinator implements JobCoordinator {
   private final SystemAdmins systemAdmins;
   private final int debounceTimeMs;
   private final Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
+  private final LocationId locationId;
 
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
@@ -131,14 +141,16 @@ public class ZkJobCoordinator implements JobCoordinator {
     this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
     systemAdmins = new SystemAdmins(config);
     streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
+    LocationIdProviderFactory locationIdProviderFactory = Util.getObj(new JobConfig(config).getLocationIdProviderFactory(), LocationIdProviderFactory.class);
+    LocationIdProvider locationIdProvider = locationIdProviderFactory.getLocationIdProvider(config);
+    this.locationId = locationIdProvider.getLocationId();
   }
 
   @Override
   public void start() {
     ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
     zkUtils.validateZkVersion();
-    zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
-        .getJobModelPathPrefix()});
+    zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix(), keyBuilder.getTaskLocalityPath()});
 
     startMetrics();
     systemAdmins.start();
@@ -246,7 +258,13 @@ public class ZkJobCoordinator implements JobCoordinator {
   }
 
   void doOnProcessorChange() {
-    List<String> currentProcessorIds = zkUtils.getSortedActiveProcessorsIDs();
+    List<ProcessorNode> processorNodes = zkUtils.getAllProcessorNodes();
+
+    List<String> currentProcessorIds = new ArrayList<>();
+    for (ProcessorNode processorNode : processorNodes) {
+      currentProcessorIds.add(processorNode.getProcessorData().getProcessorId());
+    }
+
     Set<String> uniqueProcessorIds = new HashSet<>(currentProcessorIds);
 
     if (currentProcessorIds.size() != uniqueProcessorIds.size()) {
@@ -256,7 +274,7 @@ public class ZkJobCoordinator implements JobCoordinator {
 
     // Generate the JobModel
     LOG.info("Generating new JobModel with processors: {}.", currentProcessorIds);
-    JobModel jobModel = generateNewJobModel(currentProcessorIds);
+    JobModel jobModel = generateNewJobModel(processorNodes);
 
     // Create checkpoint and changelog streams if they don't exist
     if (!hasCreatedStreams) {
@@ -308,7 +326,7 @@ public class ZkJobCoordinator implements JobCoordinator {
   /**
    * Generate new JobModel when becoming a leader or the list of processor changed.
    */
-  private JobModel generateNewJobModel(List<String> processors) {
+  private JobModel generateNewJobModel(List<ProcessorNode> processorNodes) {
     String zkJobModelVersion = zkUtils.getJobModelVersion();
     // If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
     if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
@@ -318,11 +336,9 @@ public class ZkJobCoordinator implements JobCoordinator {
       }
       cachedJobModelVersion = zkJobModelVersion;
     }
-    /**
-     * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
-     * to host mapping) is passed in as null when building the jobModel.
-     */
-    JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
+
+    GrouperMetadata grouperMetadata = getGrouperMetadata(zkJobModelVersion, processorNodes);
+    JobModel model = JobModelManager.readJobModel(config, changeLogPartitionMap, streamMetadataCache, grouperMetadata);
     return new JobModel(new MapConfig(), model.getContainers());
   }
 
@@ -343,6 +359,39 @@ public class ZkJobCoordinator implements JobCoordinator {
       });
   }
 
+  /**
+   * Builds the {@link GrouperMetadataImpl} based upon provided {@param jobModelVersion}
+   * and {@param processorNodes}.
+   * @param jobModelVersion the most recent jobModelVersion available in the zookeeper.
+   * @param processorNodes the list of live processors in the zookeeper.
+   * @return the built grouper metadata.
+   */
+  private GrouperMetadataImpl getGrouperMetadata(String jobModelVersion, List<ProcessorNode> processorNodes) {
+    Map<TaskName, String> taskToProcessorId = new HashMap<>();
+    Map<TaskName, List<SystemStreamPartition>> taskToSSPs = new HashMap<>();
+    if (jobModelVersion != null) {
+      JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
+      for (ContainerModel containerModel : jobModel.getContainers().values()) {
+        for (TaskModel taskModel : containerModel.getTasks().values()) {
+          taskToProcessorId.put(taskModel.getTaskName(), containerModel.getId());
+          for (SystemStreamPartition partition : taskModel.getSystemStreamPartitions()) {
+            taskToSSPs.computeIfAbsent(taskModel.getTaskName(), k -> new ArrayList<>());
+            taskToSSPs.get(taskModel.getTaskName()).add(partition);
+          }
+        }
+      }
+    }
+
+    Map<String, LocationId> processorLocality = new HashMap<>();
+    for (ProcessorNode processorNode : processorNodes) {
+      ProcessorData processorData = processorNode.getProcessorData();
+      processorLocality.put(processorData.getProcessorId(), processorData.getLocationId());
+    }
+
+    Map<TaskName, LocationId> taskLocality = zkUtils.readTaskLocality();
+    return new GrouperMetadataImpl(processorLocality, taskLocality, taskToSSPs, taskToProcessorId);
+  }
+
   class LeaderElectorListenerImpl implements LeaderElectorListener {
     @Override
     public void onBecomingLeader() {
@@ -390,6 +439,11 @@ public class ZkJobCoordinator implements JobCoordinator {
             JobModel jobModel = getJobModel();
             // start the container with the new model
             if (coordinatorListener != null) {
+              for (ContainerModel containerModel : jobModel.getContainers().values()) {
+                for (TaskName taskName : containerModel.getTasks().keySet()) {
+                  zkUtils.writeTaskLocality(taskName, locationId);
+                }
+              }
               coordinatorListener.onNewJobModel(processorId, jobModel);
             }
           });

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 6349432..56ea577 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.Objects;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
@@ -37,8 +39,10 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.runtime.LocationId;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.zookeeper.data.Stat;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -239,6 +243,29 @@ public class ZkUtils {
     return processorNodes;
   }
 
+  public void writeTaskLocality(TaskName taskName, LocationId locationId) {
+    String taskLocalityPath = String.format("%s/%s", keyBuilder.getTaskLocalityPath(), taskName);
+    validatePaths(new String[] {taskLocalityPath});
+    writeData(taskLocalityPath, locationId.getId());
+  }
+
+  public Map<TaskName, LocationId> readTaskLocality() {
+    Map<TaskName, LocationId> taskLocality = new HashMap<>();
+    String taskLocalityPath = keyBuilder.getTaskLocalityPath();
+    List<String> tasks = new ArrayList<>();
+    if (zkClient.exists(taskLocalityPath)) {
+      tasks = zkClient.getChildren(taskLocalityPath);
+    }
+    for (String taskName : tasks) {
+      String taskPath = String.format("%s/%s", keyBuilder.getTaskLocalityPath(), taskName);
+      String locationId = zkClient.readData(taskPath, true);
+      if (locationId != null) {
+        taskLocality.put(new TaskName(taskName), new LocationId(locationId));
+      }
+    }
+    return taskLocality;
+  }
+
   /**
    * Method is used to get the <i>sorted</i> list of currently active/registered processors (znodes)
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 03effe6..865658f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -127,8 +127,8 @@ object SamzaContainer extends Logging {
     taskFactory: TaskFactory[_],
     jobContext: JobContext,
     applicationContainerContextFactoryOption: Option[ApplicationContainerContextFactory[ApplicationContainerContext]],
-    applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]
-  ) = {
+    applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]],
+    localityManager: LocalityManager = null) = {
     val config = jobContext.getConfig
     val containerModel = jobModel.getContainers.get(containerId)
     val containerName = "samza-container-%s" format containerId
@@ -735,6 +735,7 @@ object SamzaContainer extends Logging {
       systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
+      localityManager = localityManager,
       offsetManager = offsetManager,
       securityManager = securityManager,
       metrics = samzaContainerMetrics,
@@ -990,16 +991,13 @@ class SamzaContainer(
 
   def storeContainerLocality {
     val isHostAffinityEnabled: Boolean = new ClusterManagerConfig(config).getHostAffinityEnabled
-    if (isHostAffinityEnabled) {
-      val localityManager: LocalityManager = new LocalityManager(config, containerContext.getContainerMetricsRegistry)
+    if (isHostAffinityEnabled && localityManager != null) {
       val containerId = containerContext.getContainerModel.getId
       val containerName = "SamzaContainer-" + containerId
       info("Registering %s with metadata store" format containerName)
       try {
         val hostInet = Util.getLocalHost
-        val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else ""
-        val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else ""
-        info("Writing container locality and JMX address to metadata store")
+        info("Writing container locality to metadata store")
         localityManager.writeContainerToHostMapping(containerId, hostInet.getHostName)
       } catch {
         case uhe: UnknownHostException =>