You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/08/27 02:39:44 UTC

[samza] branch master updated: SAMZA-2439: Remove LocalityManager and container location information from JobModel (#1421)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f7f9f3c  SAMZA-2439: Remove LocalityManager and container location information from JobModel (#1421)
f7f9f3c is described below

commit f7f9f3c7905b047f262383bcc64ebb01ab73f421
Author: mynameborat <bh...@gmail.com>
AuthorDate: Wed Aug 26 19:39:32 2020 -0700

    SAMZA-2439: Remove LocalityManager and container location information from JobModel (#1421)
    
    Issues
    Currently locality information is part of job model. Job model typically is immutable and fixed within the lifecycle of an application attempt. The locality information on the other hand is dynamic and changes in the event of container movements. Due to this difference, it makes it complicated to program, model or define semantics around these models when building features. Furthermore, by removing this dependency
    
    - Enables us to move JobModel to public APIs and expose it in JobContext
    - Enables us to cache and serve serialized JobModel from the AM servlet to reduce AM overhead (memory, open connections, num threads) during container startup, esp. for jobs with a large number of containers (See: #1241)
    - Removes tech debt: models should be immutable, and should not update themselves.
    - Removes tech debt: makes current container location a first class concept for container scheduling / placement , and for tools like dashboard, samza-rest, auto-scaling, diagnostics etc.
    
    Changes
    - Separated out locality information out of job model into LocalityModel
    - Introduced an endpoint in AM to serve locality information
    - Added Json MixIns for locality models (LocalityModel & ContainerLocality)
    - Moved JobModel to samza-api and exposed through JobContext
    
    API Changes:
    - Introduced new models for locality.
    - Previous job model endpoint will no longer serve locality information. i.e. tools using these will need to update to use the new endpoint.
    - Expose JobModel via JobContext
---
 .../java/org/apache/samza/context/JobContext.java  |   6 +
 .../java/org/apache/samza/job/model/JobModel.java  |  76 ------------
 .../org/apache/samza/job/model/LocalityModel.java  |  83 +++++++++++++
 .../apache/samza/job/model/ProcessorLocality.java  |  86 ++++++++++++++
 .../clustermanager/ClusterBasedJobCoordinator.java |   8 +-
 .../samza/clustermanager/ContainerManager.java     |  16 ++-
 .../clustermanager/ContainerProcessManager.java    |  36 ++++--
 .../clustermanager/StandbyContainerManager.java    |  18 ++-
 .../apache/samza/container/LocalityManager.java    |  49 ++++----
 .../org/apache/samza/context/JobContextImpl.java   |  15 ++-
 .../samza/coordinator/server/LocalityServlet.java  |  69 +++++++++++
 .../apache/samza/execution/LocalJobPlanner.java    |   1 -
 .../apache/samza/execution/RemoteJobPlanner.java   |   1 -
 .../apache/samza/processor/StreamProcessor.java    |   2 +-
 .../apache/samza/runtime/ContainerLaunchUtil.java  |   2 +-
 .../serializers/model/JsonLocalityModelMixIn.java  |  37 +++---
 .../model/JsonProcessorLocalityMixIn.java          |  48 ++++++++
 .../samza/serializers/model/SamzaObjectMapper.java |   7 ++
 .../org/apache/samza/storage/StorageRecovery.java  |   4 +-
 .../apache/samza/coordinator/JobModelManager.scala |  32 ++---
 .../apache/samza/job/local/ThreadJobFactory.scala  |   2 +-
 .../TestContainerAllocatorWithHostAffinity.java    |  27 ++---
 .../TestContainerAllocatorWithoutHostAffinity.java |   8 +-
 .../TestContainerPlacementActions.java             |  73 +++++-------
 .../TestContainerProcessManager.java               | 127 ++++++++++++--------
 .../samza/clustermanager/TestStandbyAllocator.java |  10 +-
 .../samza/container/TestLocalityManager.java       |  20 +++-
 .../samza/coordinator/JobModelManagerTestUtil.java |  19 +--
 .../samza/coordinator/TestJobModelManager.java     | 112 ++++--------------
 .../operators/impl/TestOperatorImplGraph.java      |   2 +-
 .../samza/rest/proxy/task/SamzaTaskProxy.java      |   9 +-
 .../test/performance/TestKeyValuePerformance.scala |   2 +-
 .../samza/validation/YarnJobValidationTool.java    |  42 +++----
 .../resources/scalate/WEB-INF/views/index.scaml    |   3 +-
 .../apache/samza/webapp/TestLocalityServlet.java   | 131 +++++++++++++++++++++
 35 files changed, 762 insertions(+), 421 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/context/JobContext.java b/samza-api/src/main/java/org/apache/samza/context/JobContext.java
index 8e41980..7166446 100644
--- a/samza-api/src/main/java/org/apache/samza/context/JobContext.java
+++ b/samza-api/src/main/java/org/apache/samza/context/JobContext.java
@@ -19,6 +19,7 @@
 package org.apache.samza.context;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.job.model.JobModel;
 
 
 /**
@@ -46,4 +47,9 @@ public interface JobContext {
    * @return the id for this job
    */
   String getJobId();
+
+  /**
+   * @return the {@link JobModel} for the job
+   */
+  JobModel getJobModel();
 }
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-api/src/main/java/org/apache/samza/job/model/JobModel.java
similarity index 55%
rename from samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
rename to samza-api/src/main/java/org/apache/samza/job/model/JobModel.java
index be26f10..d1f5e72 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-api/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -20,11 +20,8 @@
 package org.apache.samza.job.model;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 
 /**
  * <p>
@@ -39,34 +36,14 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
  * </p>
  */
 public class JobModel {
-  private static final String EMPTY_STRING = "";
   private final Config config;
   private final Map<String, ContainerModel> containers;
 
-  private final LocalityManager localityManager;
-  private final Map<String, String> localityMappings;
-
   public int maxChangeLogStreamPartitions;
 
   public JobModel(Config config, Map<String, ContainerModel> containers) {
-    this(config, containers, null);
-  }
-
-  public JobModel(Config config, Map<String, ContainerModel> containers, LocalityManager localityManager) {
     this.config = config;
     this.containers = Collections.unmodifiableMap(containers);
-    this.localityManager = localityManager;
-
-    // initialize container localityMappings
-    this.localityMappings = new HashMap<>();
-    if (localityManager == null) {
-      for (String containerId : containers.keySet()) {
-        localityMappings.put(containerId, null);
-      }
-    } else {
-      populateContainerLocalityMappings();
-    }
-
 
     // Compute the number of change log stream partitions as the maximum partition-id
     // of all total number of tasks of the job; Increment by 1 because partition ids
@@ -84,59 +61,6 @@ public class JobModel {
     return config;
   }
 
-  /**
-   * Returns the container to host mapping for a given container ID and mapping key
-   *
-   * @param containerId the ID of the container
-   * @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.messages.SetContainerHostMapping}
-   * @return the value if it exists for a given container and key, otherwise an empty string
-   */
-  public String getContainerToHostValue(String containerId, String key) {
-    if (localityManager == null) {
-      return EMPTY_STRING;
-    }
-    final Map<String, String> mappings = localityManager.readContainerLocality().get(containerId);
-    if (mappings == null) {
-      return EMPTY_STRING;
-    }
-    if (!mappings.containsKey(key)) {
-      return EMPTY_STRING;
-    }
-    return mappings.get(key);
-  }
-
-  public Map<String, String> getAllContainerToHostValues(String key) {
-    if (localityManager == null) {
-      return Collections.EMPTY_MAP;
-    }
-    Map<String, String> allValues = new HashMap<>();
-    for (Map.Entry<String, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
-      String value = entry.getValue().get(key);
-      if (value != null) {
-        allValues.put(entry.getKey(), value);
-      }
-    }
-    return allValues;
-  }
-
-  private void populateContainerLocalityMappings() {
-    Map<String, Map<String, String>> allMappings = localityManager.readContainerLocality();
-    for (String containerId: containers.keySet()) {
-      if (allMappings.containsKey(containerId)) {
-        localityMappings.put(containerId, allMappings.get(containerId).get(SetContainerHostMapping.HOST_KEY));
-      } else {
-        localityMappings.put(containerId, null);
-      }
-    }
-  }
-
-  public Map<String, String> getAllContainerLocality() {
-    if (localityManager != null) {
-      populateContainerLocalityMappings();
-    }
-    return localityMappings;
-  }
-
   public Map<String, ContainerModel> getContainers() {
     return containers;
   }
diff --git a/samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java b/samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
new file mode 100644
index 0000000..7775434
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. This locality information is used
+ * to place the container, if possible, on the same host that it was last seen. By doing this, stateful applications
+ * can minimize the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ProcessorLocality} model and use it within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ *
+ * In YARN deployment model, processors are interchangeably used for container and <i>processorId</i>refers to
+ * logical container id.
+ */
+public class LocalityModel {
+  /*
+   * A collection of processor locality keyed by processorId.
+   */
+  private final Map<String, ProcessorLocality> processorLocalities;
+
+  /**
+   * Construct locality model for the job from the input map of processor localities.
+   * @param processorLocalities host locality information for the job keyed by processor id
+   */
+  public LocalityModel(Map<String, ProcessorLocality> processorLocalities) {
+    this.processorLocalities = ImmutableMap.copyOf(processorLocalities);
+  }
+
+  /*
+   * Returns a {@link Map} of {@link ProcessorLocality} keyed by processors id.
+   */
+  public Map<String, ProcessorLocality> getProcessorLocalities() {
+    return processorLocalities;
+  }
+
+  /*
+   * Returns the {@link ProcessorLocality} for the given container processorId.
+   */
+  public ProcessorLocality getProcessorLocality(String processorId) {
+    return processorLocalities.get(processorId);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof LocalityModel)) {
+      return false;
+    }
+    LocalityModel that = (LocalityModel) o;
+    return Objects.deepEquals(processorLocalities, that.processorLocalities);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(processorLocalities);
+  }
+}
diff --git a/samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java b/samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java
new file mode 100644
index 0000000..3478568
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+
+/**
+ * A data model to represent the processor locality information. The locality information refers to the whereabouts
+ * of the physical execution of container.
+ * Fields such as <i>jmxUrl</i> and <i>jmxTunnelingUrl</i> exist for backward compatibility reasons as they were
+ * historically stored under the same name space as locality and surfaced within the framework through the locality
+ * manager.
+ */
+public class ProcessorLocality {
+  /* Processor identifier. In YARN deployment model, this corresponds to the logical container id */
+  private final String id;
+  /* Host on which the processor is currently placed */
+  private final String host;
+  private final String jmxUrl;
+  /* JMX tunneling URL for debugging */
+  private final String jmxTunnelingUrl;
+
+  public ProcessorLocality(String id, String host) {
+    this(id, host, "", "");
+  }
+
+  public ProcessorLocality(String id, String host, String jmxUrl, String jmxTunnelingUrl) {
+    this.id = id;
+    this.host = host;
+    this.jmxUrl = jmxUrl;
+    this.jmxTunnelingUrl = jmxTunnelingUrl;
+  }
+
+  public String id() {
+    return id;
+  }
+
+  public String host() {
+    return host;
+  }
+
+  public String jmxUrl() {
+    return jmxUrl;
+  }
+
+  public String jmxTunnelingUrl() {
+    return jmxTunnelingUrl;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ProcessorLocality that = (ProcessorLocality) o;
+    return Objects.equals(id, that.id)
+        && Objects.equals(host, that.host)
+        && Objects.equals(jmxUrl, that.jmxUrl)
+        && Objects.equals(jmxTunnelingUrl, that.jmxTunnelingUrl);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(id, host, jmxUrl, jmxTunnelingUrl);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 8482a3b..68e2f77 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -44,6 +44,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.InputStreamsDiscoveredException;
 import org.apache.samza.coordinator.JobModelManager;
@@ -54,6 +55,7 @@ import org.apache.samza.coordinator.StreamRegexMonitor;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.JobModelUtil;
@@ -174,6 +176,7 @@ public class ClusterBasedJobCoordinator {
   private final MetadataStore metadataStore;
 
   private final SystemAdmins systemAdmins;
+  private final LocalityManager localityManager;
 
   /**
    * Internal variable for the instance of {@link JmxServer}
@@ -215,6 +218,8 @@ public class ClusterBasedJobCoordinator {
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
     this.isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
     this.jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval();
+    this.localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
 
     // build metastore for container placement messages
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(metadataStore);
@@ -343,6 +348,7 @@ public class ClusterBasedJobCoordinator {
       systemAdmins.stop();
       shutDowncontainerPlacementRequestAllocatorAndUtils();
       containerProcessManager.stop();
+      localityManager.close();
       metadataStore.close();
     } catch (Throwable e) {
       LOG.error("Exception while stopping cluster based job coordinator", e);
@@ -457,7 +463,7 @@ public class ClusterBasedJobCoordinator {
 
   @VisibleForTesting
   ContainerProcessManager createContainerProcessManager() {
-    return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore);
+    return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore, localityManager);
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 2730c0c..24130fd 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -19,6 +19,7 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.UUID;
@@ -27,10 +28,11 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.placement.ContainerPlacementMessage;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
 import org.apache.samza.container.placement.ContainerPlacementResponseMessage;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.util.BoundedLinkedHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,19 +84,23 @@ public class ContainerManager {
 
   private final Optional<StandbyContainerManager> standbyContainerManager;
 
+  private final LocalityManager localityManager;
+
   public ContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
       SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
-      boolean hostAffinityEnabled, boolean standByEnabled) {
+      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {
+    Preconditions.checkNotNull(localityManager, "Locality manager cannot be null");
     this.samzaApplicationState = samzaApplicationState;
     this.clusterResourceManager = clusterResourceManager;
     this.actions = new ConcurrentHashMap<>();
     this.placementRequestsCache = new BoundedLinkedHashSet<UUID>(UUID_CACHE_SIZE);
     this.hostAffinityEnabled = hostAffinityEnabled;
     this.containerPlacementMetadataStore = containerPlacementMetadataStore;
+    this.localityManager = localityManager;
     // Enable standby container manager if required
     if (standByEnabled) {
       this.standbyContainerManager =
-          Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager));
+          Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager, localityManager));
     } else {
       this.standbyContainerManager = Optional.empty();
     }
@@ -529,7 +535,9 @@ public class ContainerManager {
           processorId, currentResource.getContainerId(), currentResource.getHost(), requestMessage);
       sourceHost = currentResource.getHost();
     } else {
-      sourceHost = samzaApplicationState.jobModelManager.jobModel().getContainerToHostValue(processorId, SetContainerHostMapping.HOST_KEY);
+      sourceHost = Optional.ofNullable(localityManager.readLocality().getProcessorLocality(processorId))
+          .map(ProcessorLocality::host)
+          .orElse(null);
       LOG.info("Processor ID: {} is not running and was last seen on host: {} for ContainerPlacement action: {}",
           processorId, sourceHost, requestMessage);
     }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index f6e3b1f..1bc1669 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -19,6 +19,7 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
@@ -26,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
@@ -33,9 +35,11 @@ import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.diagnostics.DiagnosticsManager;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.metrics.ContainerProcessManagerMetrics;
 import org.apache.samza.metrics.JvmMetrics;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -103,6 +107,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
 
   private final Option<DiagnosticsManager> diagnosticsManager;
 
+  private final LocalityManager localityManager;
+
   /**
    * A standard interface to request resources.
    */
@@ -130,7 +136,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
   private Map<String, MetricsReporter> metricsReporters;
 
   public ContainerProcessManager(Config config, SamzaApplicationState state, MetricsRegistryMap registry,
-      ContainerPlacementMetadataStore metadataStore) {
+      ContainerPlacementMetadataStore metadataStore, LocalityManager localityManager) {
+    Preconditions.checkNotNull(localityManager, "Locality manager cannot be null");
     this.state = state;
     this.clusterManagerConfig = new ClusterManagerConfig(config);
     this.jobConfig = new JobConfig(config);
@@ -159,11 +166,12 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
       diagnosticsManager = Option.empty();
     }
 
+    this.localityManager = localityManager;
     // Wire all metrics to all reporters
     this.metricsReporters.values().forEach(reporter -> reporter.register(METRICS_SOURCE_NAME, registry));
 
     this.containerManager = new ContainerManager(metadataStore, state, clusterResourceManager, hostAffinityEnabled,
-        jobConfig.getStandbyTasksEnabled());
+        jobConfig.getStandbyTasksEnabled(), localityManager);
 
     this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager);
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
@@ -176,7 +184,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
       MetricsRegistryMap registry,
       ClusterResourceManager resourceManager,
       Optional<ContainerAllocator> allocator,
-      ContainerManager containerManager) {
+      ContainerManager containerManager,
+      LocalityManager localityManager) {
     this.state = state;
     this.clusterManagerConfig = clusterManagerConfig;
     this.jobConfig = new JobConfig(clusterManagerConfig);
@@ -186,6 +195,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     this.clusterResourceManager = resourceManager;
     this.containerManager = containerManager;
     this.diagnosticsManager = Option.empty();
+    this.localityManager = localityManager;
     this.containerAllocator = allocator.orElseGet(
       () -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state,
           hostAffinityEnabled, this.containerManager));
@@ -233,8 +243,16 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
 
     // Request initial set of containers
-    Map<String, String> processorToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
-    containerAllocator.requestResources(processorToHostMapping);
+    LocalityModel localityModel = localityManager.readLocality();
+    Map<String, String> processorToHost = new HashMap<>();
+    state.jobModelManager.jobModel().getContainers().keySet().forEach((containerId) -> {
+      String host = Optional.ofNullable(localityModel.getProcessorLocality(containerId))
+          .map(ProcessorLocality::host)
+          .filter(StringUtils::isNotBlank)
+          .orElse(null);
+      processorToHost.put(containerId, host);
+    });
+    containerAllocator.requestResources(processorToHost);
 
     // Start container allocator thread
     LOG.info("Starting the container allocator thread");
@@ -476,8 +494,10 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
 
     state.neededProcessors.incrementAndGet();
     // Find out previously running container location
-    String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(processorId, SetContainerHostMapping.HOST_KEY);
-    if (!hostAffinityEnabled || lastSeenOn == null) {
+    String lastSeenOn = Optional.ofNullable(localityManager.readLocality().getProcessorLocality(processorId))
+        .map(ProcessorLocality::host)
+        .orElse(null);
+    if (!hostAffinityEnabled || StringUtils.isBlank(lastSeenOn)) {
       lastSeenOn = ResourceRequestState.ANY_HOST;
     }
     LOG.info("Container ID: {} for Processor ID: {} was last seen on host {}.", containerId, processorId, lastSeenOn);
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index 30d0de9..b849ea5 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -26,8 +26,10 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +44,8 @@ public class StandbyContainerManager {
 
   private final SamzaApplicationState samzaApplicationState;
 
+  private final LocalityManager localityManager;
+
   // Map of samza containerIDs to their corresponding active and standby containers, e.g., 0 -> {0-0, 0-1}, 0-0 -> {0, 0-1}
   // This is used for checking no two standbys or active-standby-pair are started on the same host
   private final Map<String, List<String>> standbyContainerConstraints;
@@ -53,8 +57,9 @@ public class StandbyContainerManager {
   private ClusterResourceManager clusterResourceManager;
 
   public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
-      ClusterResourceManager clusterResourceManager) {
+      ClusterResourceManager clusterResourceManager, LocalityManager localityManager) {
     this.failovers = new ConcurrentHashMap<>();
+    this.localityManager = localityManager;
     this.standbyContainerConstraints = new HashMap<>();
     this.samzaApplicationState = samzaApplicationState;
     JobModel jobModel = samzaApplicationState.jobModelManager.jobModel();
@@ -297,12 +302,13 @@ public class StandbyContainerManager {
     // We iterate over the list of last-known standbyHosts to check if anyone of them has not already been tried
     for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) {
 
-      String standbyHost = this.samzaApplicationState.jobModelManager.jobModel().
-          getContainerToHostValue(standbyContainerID, SetContainerHostMapping.HOST_KEY);
+      String standbyHost =
+          Optional.ofNullable(localityManager.readLocality().getProcessorLocality(standbyContainerID))
+              .map(ProcessorLocality::host)
+              .orElse(null);
 
-      if (standbyHost == null || standbyHost.isEmpty()) {
+      if (StringUtils.isNotBlank(standbyHost)) {
         log.info("No last known standbyHost for container {}", standbyContainerID);
-
       } else if (failoverMetadata.isPresent() && failoverMetadata.get().isStandbyHostUsed(standbyHost)) {
 
         log.info("Not using standby host {} for active container {} because it had already been selected", standbyHost,
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 34baad0..6f6951f 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -19,18 +19,22 @@
 
 package org.apache.samza.container;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.serializers.Serde;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Used for persisting and reading the container-to-host assignment information into the metadata store.
+ * Used for persisting and reading the locality information into the metadata store. Currently, we store the
+ * processor-to-host assignment.
  * */
 public class LocalityManager {
   private static final Logger LOG = LoggerFactory.getLogger(LocalityManager.class);
@@ -53,47 +57,46 @@ public class LocalityManager {
   }
 
   /**
-   * Method to allow read container locality information from the {@link MetadataStore}.
-   * This method is used in {@link org.apache.samza.coordinator.JobModelManager}.
+   * Fetch the processor locality information from the {@link MetadataStore}. In YARN deployment model, the
+   * processor refers to the samza container.
    *
-   * @return the map of containerId: (hostname)
+   * @return the {@code LocalityModel} for the job
    */
-  public Map<String, Map<String, String>> readContainerLocality() {
-    Map<String, Map<String, String>> allMappings = new HashMap<>();
-    metadataStore.all().forEach((containerId, valueBytes) -> {
+  public LocalityModel readLocality() {
+    Map<String, ProcessorLocality> containerLocalityMap = new HashMap<>();
+
+    metadataStore.all().forEach((processorId, valueBytes) -> {
       if (valueBytes != null) {
         String locationId = valueSerde.fromBytes(valueBytes);
-        Map<String, String> values = new HashMap<>();
-        values.put(SetContainerHostMapping.HOST_KEY, locationId);
-        allMappings.put(containerId, values);
+        containerLocalityMap.put(processorId, new ProcessorLocality(processorId, locationId));
       }
     });
     if (LOG.isDebugEnabled()) {
-      for (Map.Entry<String, Map<String, String>> entry : allMappings.entrySet()) {
-        LOG.debug(String.format("Locality for container %s: %s", entry.getKey(), entry.getValue()));
+      for (Map.Entry<String, ProcessorLocality> entry : containerLocalityMap.entrySet()) {
+        LOG.debug(String.format("Locality for container %s: %s", entry.getKey(), entry.getValue().host()));
       }
     }
 
-    return Collections.unmodifiableMap(allMappings);
+    return new LocalityModel(containerLocalityMap);
   }
 
   /**
    * Method to write locality information to the {@link MetadataStore}. This method is used in {@link SamzaContainer}.
    *
-   * @param containerId  the {@link SamzaContainer} ID
+   * @param processorId a.k.a logical container ID
    * @param hostName  the hostname
    */
-  public void writeContainerToHostMapping(String containerId, String hostName) {
-    Map<String, Map<String, String>> containerToHostMapping = readContainerLocality();
-    Map<String, String> existingMappings = containerToHostMapping.get(containerId);
-    String existingHostMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
-    if (existingHostMapping != null && !existingHostMapping.equals(hostName)) {
-      LOG.info("Container {} moved from {} to {}", containerId, existingHostMapping, hostName);
+  public void writeContainerToHostMapping(String processorId, String hostName) {
+    String existingHostMapping = Optional.ofNullable(readLocality().getProcessorLocality(processorId))
+        .map(ProcessorLocality::host)
+        .orElse(null);
+    if (StringUtils.isNotBlank(existingHostMapping) && !existingHostMapping.equals(hostName)) {
+      LOG.info("Container {} moved from {} to {}", processorId, existingHostMapping, hostName);
     } else {
-      LOG.info("Container {} started at {}", containerId, hostName);
+      LOG.info("Container {} started at {}", processorId, hostName);
     }
 
-    metadataStore.put(containerId, valueSerde.toBytes(hostName));
+    metadataStore.put(processorId, valueSerde.toBytes(hostName));
     metadataStore.flush();
   }
 
diff --git a/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
index ee6b492..4199d39 100644
--- a/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
@@ -20,17 +20,20 @@ package org.apache.samza.context;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.job.model.JobModel;
 
 
 public class JobContextImpl implements JobContext {
   private final Config config;
+  private final JobModel jobModel;
   private final String jobName;
   private final String jobId;
 
-  private JobContextImpl(Config config, String jobName, String jobId) {
+  private JobContextImpl(Config config, String jobName, String jobId, JobModel jobModel) {
     this.config = config;
     this.jobName = jobName;
     this.jobId = jobId;
+    this.jobModel = jobModel;
   }
 
   /**
@@ -38,15 +41,16 @@ public class JobContextImpl implements JobContext {
    * This extracts some information like job name and job id.
    *
    * @param config used to extract job information
+   * @param jobModel job model
    * @return {@link JobContextImpl} corresponding to the {@code config}
    * @throws IllegalArgumentException if job name is not defined in the {@code config}
    */
-  public static JobContextImpl fromConfigWithDefaults(Config config) {
+  public static JobContextImpl fromConfigWithDefaults(Config config, JobModel jobModel) {
     JobConfig jobConfig = new JobConfig(config);
     String jobName = jobConfig.getName()
         .orElseThrow(() -> new IllegalArgumentException(String.format("Config %s is missing", JobConfig.JOB_NAME)));
     String jobId = jobConfig.getJobId();
-    return new JobContextImpl(config, jobName, jobId);
+    return new JobContextImpl(config, jobName, jobId, jobModel);
   }
 
   @Override
@@ -63,4 +67,9 @@ public class JobContextImpl implements JobContext {
   public String getJobId() {
     return this.jobId;
   }
+
+  @Override
+  public JobModel getJobModel() {
+    return this.jobModel;
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java b/samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
new file mode 100644
index 0000000..24e12ff
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.server;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+/**
+ * A servlet for locality information of a job. The servlet is hosted alongside of the {@link JobServlet} which hosts
+ * job model and configuration. Historically, locality information was part of job model but we extracted the locality
+ * as job model is static within the lifecycle of an application attempt while locality changes in the event of container
+ * movements. The locality information is served under
+ * {@link org.apache.samza.coordinator.JobModelManager#server()}/locality. The server and the port information are
+ * dynamic and is determined at the start AM. YARN dashboard or job-coordinator logs contains the server
+ * and the port information.
+ *
+ * This separation enables us to achieve performance benefits by caching job model when it is served by the AM as it
+ * can incur significant penalty in the job start time for jobs with large number of containers.
+ */
+public class LocalityServlet extends HttpServlet {
+  private static final String PROCESSOR_ID_PARAM = "processorId";
+  private final ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+  private final LocalityManager localityManager;
+
+  public LocalityServlet(LocalityManager localityManager) {
+    this.localityManager = localityManager;
+  }
+
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    LocalityModel localityModel = localityManager.readLocality();
+
+    if (request.getParameterMap().size() == 1) {
+      String processorId = request.getParameter(PROCESSOR_ID_PARAM);
+      ProcessorLocality processorLocality = Optional.ofNullable(localityModel.getProcessorLocality(processorId))
+          .orElse(new ProcessorLocality(processorId, ""));
+      mapper.writeValue(response.getWriter(), processorLocality);
+    } else {
+      mapper.writeValue(response.getWriter(), localityModel);
+    }
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
index f55f02f..79b52e6 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -97,7 +97,6 @@ public class LocalJobPlanner extends JobPlanner {
     }
 
     // 2. create the necessary streams
-    // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391
     // TODO: this works for single-job applications. For multi-job applications, ExecutionPlan should return an AppConfig
     // to be used for the whole application
     JobConfig jobConfig = jobConfigs.get(0);
diff --git a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
index c51fd85..9dd85b5 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
@@ -48,7 +48,6 @@ public class RemoteJobPlanner extends JobPlanner {
   @Override
   public List<JobConfig> prepareJobs() {
     // for high-level DAG, generate the plan and job configs
-    // TODO: run.id needs to be set for standalone: SAMZA-1531
     // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision
     String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
     LOG.info("The run id for this run is {}", runId);
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index ed0c875..47c1754 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -405,7 +405,7 @@ public class StreamProcessor {
     }
 
     return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
-        this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
+        this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config, jobModel),
         Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
         Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
         Option.apply(this.externalContextOptional.orElse(null)), null, startpointManager,
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 4470ae7..a5148fb 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -128,7 +128,7 @@ public class ContainerLaunchUtil {
           containerId, jobModel,
           ScalaJavaUtil.toScalaMap(metricsReporters),
           taskFactory,
-          JobContextImpl.fromConfigWithDefaults(config),
+          JobContextImpl.fromConfigWithDefaults(config, jobModel),
           Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
           Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
           Option.apply(externalContextOptional.orElse(null)),
diff --git a/samza-api/src/main/java/org/apache/samza/context/JobContext.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonLocalityModelMixIn.java
similarity index 53%
copy from samza-api/src/main/java/org/apache/samza/context/JobContext.java
copy to samza-core/src/main/java/org/apache/samza/serializers/model/JsonLocalityModelMixIn.java
index 8e41980..79b1367 100644
--- a/samza-api/src/main/java/org/apache/samza/context/JobContext.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonLocalityModelMixIn.java
@@ -16,34 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.samza.context;
+package org.apache.samza.serializers.model;
 
-import org.apache.samza.config.Config;
+import java.util.Map;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
 
 
 /**
- * The framework-provided context for the job.
+ * A mix-in Jackson class to convert {@link org.apache.samza.job.model.LocalityModel} to/from JSON
  */
-public interface JobContext {
+@JsonIgnoreProperties(ignoreUnknown = true)
+public abstract class JsonLocalityModelMixIn {
+  @JsonCreator
+  public JsonLocalityModelMixIn(@JsonProperty("processor-localities") Map<String, ProcessorLocality> processorLocalities) {
 
-  /**
-   * Gets the final configuration for this job.
-   *
-   * @return the configuration for this job
-   */
-  Config getConfig();
+  }
 
-  /**
-   * Gets the name of the job.
-   *
-   * @return the name of this job
-   */
-  String getJobName();
-
-  /**
-   * Gets the id for this job.
-   *
-   * @return the id for this job
-   */
-  String getJobId();
+  @JsonProperty("processor-localities")
+  abstract Map<String, ProcessorLocality> processorLocalities();
 }
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonProcessorLocalityMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonProcessorLocalityMixIn.java
new file mode 100644
index 0000000..cce573e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonProcessorLocalityMixIn.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.serializers.model;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * A mix-in Jackson class to convert {@link org.apache.samza.job.model.ProcessorLocality} to/from JSON
+ * <b>NOTE:</b> In YARN deployment model, the id refers to the logical container id.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public abstract class JsonProcessorLocalityMixIn {
+  @JsonCreator
+  public JsonProcessorLocalityMixIn(@JsonProperty("id") String id, @JsonProperty("host") String host,
+      @JsonProperty("jmx-url") String jmxUrl, @JsonProperty("jmx-tunneling-url") String jmxTunnelingUrl) {
+  }
+
+  @JsonProperty("id")
+  abstract String id();
+
+  @JsonProperty("host")
+  abstract String host();
+
+  @JsonProperty("jmx-url")
+  abstract String jmxUrl();
+
+  @JsonProperty("jmx-tunneling-url")
+  abstract String jmxTunnelingUrl();
+}
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 694987f..db147f0 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -28,7 +28,9 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.system.SystemStream;
@@ -130,6 +132,11 @@ public class SamzaObjectMapper {
       }
     });
 
+    mapper.getSerializationConfig().addMixInAnnotations(LocalityModel.class, JsonLocalityModelMixIn.class);
+    mapper.getDeserializationConfig().addMixInAnnotations(LocalityModel.class, JsonLocalityModelMixIn.class);
+    mapper.getSerializationConfig().addMixInAnnotations(ProcessorLocality.class, JsonProcessorLocalityMixIn.class);
+    mapper.getDeserializationConfig().addMixInAnnotations(ProcessorLocality.class, JsonProcessorLocalityMixIn.class);
+
     // Convert camel case to hyphenated field names, and register the module.
     mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
     mapper.registerModule(module);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index f352bd0..9d1896e 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -77,6 +77,7 @@ public class StorageRecovery {
   private final Map<String, ContainerStorageManager> containerStorageManagers = new HashMap<>();
 
   private int maxPartitionNumber = 0;
+  private JobModel jobModel;
   private Map<String, ContainerModel> containers = new HashMap<>();
 
   /**
@@ -145,6 +146,7 @@ public class StorageRecovery {
           JobModelManager.apply(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(),
               coordinatorStreamStore, metricsRegistryMap);
       JobModel jobModel = jobModelManager.jobModel();
+      this.jobModel = jobModel;
       containers = jobModel.getContainers();
     } finally {
       coordinatorStreamStore.close();
@@ -249,7 +251,7 @@ public class StorageRecovery {
               jobConfig,
               new HashMap<>(),
               new SamzaContainerMetrics(containerModel.getId(), new MetricsRegistryMap(), ""),
-              JobContextImpl.fromConfigWithDefaults(jobConfig),
+              JobContextImpl.fromConfigWithDefaults(jobConfig, jobModel),
               containerContext,
               new HashMap<>(),
               storeBaseDir,
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 37162c4..c7e7c7c 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -35,8 +35,7 @@ import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
 import org.apache.samza.container.LocalityManager
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
-import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.coordinator.server.{HttpServer, JobServlet, LocalityServlet}
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
@@ -95,14 +94,15 @@ object JobModelManager extends Logging {
       val grouperMetadata: GrouperMetadata = getGrouperMetadata(config, localityManager, taskAssignmentManager, taskPartitionAssignmentManager)
 
       val jobModel: JobModel = readJobModel(config, changelogPartitionMapping, streamMetadataCache, grouperMetadata)
-      jobModelRef.set(new JobModel(jobModel.getConfig, jobModel.getContainers, localityManager))
+      jobModelRef.set(new JobModel(jobModel.getConfig, jobModel.getContainers))
 
       updateTaskAssignments(jobModel, taskAssignmentManager, taskPartitionAssignmentManager, grouperMetadata)
 
       val server = new HttpServer
       server.addServlet("/", new JobServlet(jobModelRef))
+      server.addServlet("/locality", new LocalityServlet(localityManager))
 
-      currentJobModelManager = new JobModelManager(jobModelRef.get(), server, localityManager)
+      currentJobModelManager = new JobModelManager(jobModelRef.get(), server)
       currentJobModelManager
     } finally {
       systemAdmins.stop()
@@ -167,15 +167,18 @@ object JobModelManager extends Logging {
     */
   def getProcessorLocality(config: Config, localityManager: LocalityManager) = {
     val containerToLocationId: util.Map[String, LocationId] = new util.HashMap[String, LocationId]()
-    val existingContainerLocality = localityManager.readContainerLocality()
+    val existingContainerLocality = localityManager.readLocality().getProcessorLocalities
 
     for (containerId <- 0 until new JobConfig(config).getContainerCount) {
-      val localityMapping = existingContainerLocality.get(containerId.toString)
+      val preferredHost = Option.apply(existingContainerLocality.get(containerId.toString))
+        .map(containerLocality => containerLocality.host())
+        .filter(host => host.nonEmpty)
+        .orNull
       // To handle the case when the container count is increased between two different runs of a samza-yarn job,
       // set the locality of newly added containers to any_host.
       var locationId: LocationId = new LocationId("ANY_HOST")
-      if (localityMapping != null && localityMapping.containsKey(SetContainerHostMapping.HOST_KEY)) {
-        locationId = new LocationId(localityMapping.get(SetContainerHostMapping.HOST_KEY))
+      if (preferredHost != null) {
+        locationId = new LocationId(preferredHost)
       }
       containerToLocationId.put(containerId.toString, locationId)
     }
@@ -366,6 +369,7 @@ object JobModelManager extends Logging {
 
     // processor list is required by some of the groupers. So, let's pass them as part of the config.
     // Copy the config and add the processor list to the config copy.
+    // TODO: It is non-ideal to have config as a medium to transmit the locality information; especially, if the locality information evolves. Evaluate options on using context objects to pass dependent components.
     val configMap = new util.HashMap[String, String](config)
     configMap.put(JobConfig.PROCESSOR_LIST, String.join(",", grouperMetadata.getProcessorLocality.keySet()))
     val grouper = getSystemStreamPartitionGrouper(new MapConfig(configMap))
@@ -444,12 +448,7 @@ class JobModelManager(
   /**
    * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
    */
-  val server: HttpServer = null,
-
-  /**
-   * LocalityManager employed to read and write container and task locality information to metadata store.
-   */
-  val localityManager: LocalityManager = null) extends Logging {
+  val server: HttpServer = null) extends Logging {
 
   debug("Got job model: %s." format jobModel)
 
@@ -466,11 +465,6 @@ class JobModelManager(
       debug("Stopping HTTP server.")
       server.stop
       info("Stopped HTTP server.")
-      if (localityManager != null) {
-        info("Stopping localityManager")
-        localityManager.close()
-        info("Stopped localityManager")
-      }
     }
   }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 79bd181..9b5a073 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -149,7 +149,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
         jobModel,
         Map[String, MetricsReporter](),
         taskFactory,
-        JobContextImpl.fromConfigWithDefaults(config),
+        JobContextImpl.fromConfigWithDefaults(config, jobModel),
         Option(appDesc.getApplicationContainerContextFactory.orElse(null)),
         Option(appDesc.getApplicationTaskContextFactory.orElse(null)),
         buildExternalContext(config)
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index d5819eb..2b4a4b0 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -36,7 +36,8 @@ import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.JobModelManagerTestUtil;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -58,7 +59,7 @@ import static org.mockito.Mockito.*;
 public class TestContainerAllocatorWithHostAffinity {
 
   private final Config config = getConfig();
-  private final JobModelManager jobModelManager = initializeJobModelManager(config, 1);
+  private final JobModelManager jobModelManager = initializeJobModelManager(getConfig(), 1);
   private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
   private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
 
@@ -67,14 +68,7 @@ public class TestContainerAllocatorWithHostAffinity {
   private ContainerManager containerManager;
 
   private JobModelManager initializeJobModelManager(Config config, int containerCount) {
-    Map<String, Map<String, String>> localityMap = new HashMap<>();
-    localityMap.put("0", new HashMap<String, String>() { {
-        put(SetContainerHostMapping.HOST_KEY, "abc");
-      } });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-
-    return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(), containerCount, mockLocalityManager,
+    return JobModelManagerTestUtil.getJobModelManager(config, containerCount,
         new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)));
   }
 
@@ -87,12 +81,15 @@ public class TestContainerAllocatorWithHostAffinity {
 
   @Before
   public void setup() throws Exception {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "abc"))));
     CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(config);
     CoordinatorStreamStore coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
     coordinatorStreamStore.init();
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
-    containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false);
+    containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
     containerAllocator =
         new ContainerAllocator(clusterResourceManager, config, state, true, containerManager);
     requestState = new MockContainerRequestState(clusterResourceManager, true);
@@ -372,7 +369,7 @@ public class TestContainerAllocatorWithHostAffinity {
     ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
     ClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false);
+        new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class));
     // Mock the callback from ClusterManager to add resources to the allocator
     doAnswer((InvocationOnMock invocation) -> {
       SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
@@ -419,7 +416,7 @@ public class TestContainerAllocatorWithHostAffinity {
   public void testExpiredRequestAllocationOnAnyHost() throws Exception {
     MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, spyManager, true, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, spyManager, true, false, mock(LocalityManager.class)));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyManager, config, state, true, spyContainerManager));
     // Request Preferred Resources
@@ -463,7 +460,7 @@ public class TestContainerAllocatorWithHostAffinity {
     // Add Extra Resources
     MockClusterResourceManager spyClusterResourceManager = spy(new MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, spyClusterResourceManager, true, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, spyClusterResourceManager, true, false, mock(LocalityManager.class)));
 
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyClusterResourceManager, config, state, true, spyContainerManager));
@@ -516,7 +513,7 @@ public class TestContainerAllocatorWithHostAffinity {
     ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
     MockClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class)));
 
     SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, "host-0", "id0",
         System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index f9104bd..b808296 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -26,10 +26,12 @@ import java.util.Map;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.JobModelManagerTestUtil;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -72,13 +74,15 @@ public class TestContainerAllocatorWithoutHostAffinity {
 
   @Before
   public void setup() throws Exception {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
     CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(config);
     CoordinatorStreamStore coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
     coordinatorStreamStore.init();
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
     containerAllocator = new ContainerAllocator(manager, config, state, false,
-        new ContainerManager(containerPlacementMetadataStore, state, manager, false, false));
+        new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager));
     requestState = new MockContainerRequestState(manager, false);
     Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
@@ -276,7 +280,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
     ClusterResourceManager.Callback mockCPM = mock(ClusterResourceManager.Callback.class);
     ClusterResourceManager mockManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, mockManager, false, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, mockManager, false, false, mock(LocalityManager.class)));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(mockManager, config, state, false, spyContainerManager));
     // Mock the callback from ClusterManager to add resources to the allocator
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index 0ec635d..53bd5b0 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -45,8 +45,8 @@ import org.apache.samza.coordinator.JobModelManagerTestUtil;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
 import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
@@ -103,6 +103,7 @@ public class TestContainerPlacementActions {
   private ContainerManager containerManager;
   private MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity;
   private ContainerProcessManager cpm;
+  private LocalityManager localityManager;
   private ClusterResourceManager.Callback callback;
 
   private Config getConfig() {
@@ -122,30 +123,8 @@ public class TestContainerPlacementActions {
     return new MapConfig(map);
   }
 
-  private JobModelManager getJobModelManagerWithHostAffinity(Map<String, String> containerIdToHost) {
-    Map<String, Map<String, String>> localityMap = new HashMap<>();
-    containerIdToHost.forEach((containerId, host) -> {
-      localityMap.put(containerId,
-          ImmutableMap.of(SetContainerHostMapping.HOST_KEY, containerIdToHost.get(containerId)));
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-
-    return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(), containerIdToHost.size(),
-        mockLocalityManager, this.server);
-  }
-
-  private JobModelManager getJobModelManagerWithHostAffinityWithStandby(Map<String, String> containerIdToHost) {
-    Map<String, Map<String, String>> localityMap = new HashMap<>();
-    containerIdToHost.forEach((containerId, host) -> {
-      localityMap.put(containerId,
-          ImmutableMap.of(SetContainerHostMapping.HOST_KEY, containerIdToHost.get(containerId)));
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-    // Generate JobModel for standby containers
-    JobModel standbyJobModel = TestStandbyAllocator.getJobModelWithStandby(2, 2, 2, Optional.of(mockLocalityManager));
-    return new JobModelManager(standbyJobModel, server, null);
+  private JobModelManager getJobModelManagerWithStandby() {
+    return new JobModelManager(TestStandbyAllocator.getJobModelWithStandby(2, 2, 2), server);
   }
 
   @Before
@@ -159,14 +138,19 @@ public class TestContainerPlacementActions {
     containerPlacementMetadataStore.start();
     // Utils Related to Cluster manager:
     config = new MapConfig(configVals, getConfigWithHostAffinityAndRetries(true, 1, true));
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2")));
+    state = new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false));
+    localityManager = mock(LocalityManager.class);
+    when(localityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of(
+            "0", new ProcessorLocality("0", "host-1"),
+            "1", new ProcessorLocality("1", "host-2"))));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
-            clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager);
+            clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager);
   }
 
   @After
@@ -177,15 +161,22 @@ public class TestContainerPlacementActions {
   }
 
   public void setupStandby() throws Exception {
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinityWithStandby(ImmutableMap.of("0", "host-1", "1", "host-2", "0-0", "host-2", "1-0", "host-1")));
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of(
+            "0", new ProcessorLocality("0", "host-1"),
+            "1", new ProcessorLocality("1", "host-2"),
+            "0-0", new ProcessorLocality("0", "host-2"),
+            "1-0", new ProcessorLocality("0", "host-1"))));
+    state = new SamzaApplicationState(getJobModelManagerWithStandby());
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
     // Enable standby
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, true));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, true, mockLocalityManager));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
-        clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager);
+        clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, mockLocalityManager);
   }
 
   @Test(timeout = 10000)
@@ -558,14 +549,14 @@ public class TestContainerPlacementActions {
   public void testContainerPlacementsForJobRunningInDegradedState() throws Exception {
     // Set failure after retries to false to enable job running in degraded state
     config = new MapConfig(configVals, getConfigWithHostAffinityAndRetries(true, 1, false));
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2")));
+    state = new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
-        clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager);
+        clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager);
 
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
@@ -672,18 +663,18 @@ public class TestContainerPlacementActions {
     Map<String, String> conf = new HashMap<>();
     conf.putAll(getConfigWithHostAffinityAndRetries(false, 1, true));
     SamzaApplicationState state =
-        new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2")));
+        new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, false, false);
+        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, false, false, localityManager);
     MockContainerAllocatorWithoutHostAffinity allocatorWithoutHostAffinity =
         new MockContainerAllocatorWithoutHostAffinity(clusterResourceManager, new MapConfig(conf), state,
             containerManager);
 
     ContainerProcessManager cpm = new ContainerProcessManager(
         new ClusterManagerConfig(new MapConfig(getConfig(), getConfigWithHostAffinityAndRetries(false, 1, true))), state,
-        new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithoutHostAffinity), containerManager);
+        new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithoutHostAffinity), containerManager, localityManager);
 
     // Mimic Cluster Manager returning any request
     doAnswer(new Answer<Void>() {
@@ -807,16 +798,16 @@ public class TestContainerPlacementActions {
   @Test(expected = NullPointerException.class)
   public void testBadControlRequestRejected() throws Exception {
     SamzaApplicationState state =
-        new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2")));
+        new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ContainerManager containerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
     MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
         new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     ContainerProcessManager cpm = new ContainerProcessManager(
         new ClusterManagerConfig(new MapConfig(getConfig(), getConfigWithHostAffinityAndRetries(true, 1, true))), state,
-        new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager);
+        new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager);
 
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index a5dbe77..5e550cf 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -38,7 +38,8 @@ import org.apache.samza.coordinator.JobModelManagerTestUtil;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
 import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
@@ -108,20 +109,7 @@ public class TestContainerProcessManager {
 
   private HttpServer server = null;
 
-
-  private JobModelManager getJobModelManagerWithHostAffinity(Map<String, String> containerIdToHost) {
-    Map<String, Map<String, String>> localityMap = new HashMap<>();
-    containerIdToHost.forEach((containerId, host) -> {
-      localityMap.put(containerId, ImmutableMap.of(SetContainerHostMapping.HOST_KEY, containerIdToHost.get(containerId)));
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-
-    return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(),
-        containerIdToHost.size(), mockLocalityManager, this.server);
-  }
-
-  private JobModelManager getJobModelManagerWithoutHostAffinity(int containerCount) {
+  private JobModelManager getJobModelManager(int containerCount) {
     return JobModelManagerTestUtil.getJobModelManager(getConfig(), containerCount, this.server);
   }
 
@@ -149,11 +137,14 @@ public class TestContainerProcessManager {
     conf.put("cluster-manager.container.memory.mb", "500");
     conf.put("cluster-manager.container.cpu.cores", "5");
 
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"))));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false);
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
     ContainerProcessManager cpm =
         buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty());
 
@@ -169,7 +160,7 @@ public class TestContainerProcessManager {
     conf.put("cluster-manager.container.memory.mb", "500");
     conf.put("cluster-manager.container.cpu.cores", "5");
 
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1")));
+    state = new SamzaApplicationState(getJobModelManager(1));
     callback = new MockClusterResourceManagerCallback();
     clusterResourceManager = new MockClusterResourceManager(callback, state);
     cpm = new ContainerProcessManager(
@@ -178,7 +169,8 @@ public class TestContainerProcessManager {
         new MetricsRegistryMap(),
         clusterResourceManager,
         Optional.empty(),
-        containerManager
+        containerManager,
+        mockLocalityManager
     );
 
     allocator =
@@ -192,12 +184,12 @@ public class TestContainerProcessManager {
   @Test
   public void testOnInit() throws Exception {
     Config conf = getConfig();
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     ClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     ContainerProcessManager cpm =
@@ -237,7 +229,7 @@ public class TestContainerProcessManager {
   @Test
   public void testOnShutdown() throws Exception {
     Config conf = getConfig();
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
@@ -260,12 +252,12 @@ public class TestContainerProcessManager {
   @Test
   public void testCpmShouldStopWhenContainersFinish() throws Exception {
     Config conf = getConfig();
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
@@ -308,12 +300,12 @@ public class TestContainerProcessManager {
   @Test
   public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception {
     Config conf = getConfig();
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
@@ -408,11 +400,11 @@ public class TestContainerProcessManager {
     int maxRetries = 3;
     String processorId = "0";
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity, maxRetries, failAfterRetries));
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
@@ -494,12 +486,22 @@ public class TestContainerProcessManager {
     int maxRetries = 3;
     String processorId = "0";
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity, maxRetries, failAfterRetries));
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+
+    if (withHostAffinity) {
+      when(mockLocalityManager.readLocality())
+          .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"))));
+    } else {
+      when(mockLocalityManager.readLocality())
+          .thenReturn(new LocalityModel(new HashMap<>()));
+    }
+
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-            clusterManagerConfig.getHostAffinityEnabled(), false);
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false, mockLocalityManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -508,7 +510,7 @@ public class TestContainerProcessManager {
         containerManager);
 
     ContainerProcessManager cpm =
-        buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));
+        buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator), mockLocalityManager);
 
     // start triggers a request
     cpm.start();
@@ -603,12 +605,12 @@ public class TestContainerProcessManager {
   public void testInvalidNotificationsAreIgnored() throws Exception {
     Config conf = getConfig();
 
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
@@ -644,13 +646,16 @@ public class TestContainerProcessManager {
 
   @Test
   public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("1", "host1")));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     Map<String, String> configMap = new HashMap<>();
     configMap.putAll(getConfig());
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("1", "host1"))));
+    ContainerManager containerManager = buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -660,7 +665,7 @@ public class TestContainerProcessManager {
 
     ContainerProcessManager manager =
         new ContainerProcessManager(new ClusterManagerConfig(config), state, new MetricsRegistryMap(), clusterResourceManager,
-            Optional.of(allocator), containerManager);
+            Optional.of(allocator), containerManager, mockLocalityManager);
 
     manager.start();
     SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1");
@@ -680,12 +685,14 @@ public class TestContainerProcessManager {
     config.put("cluster-manager.container.request.timeout.ms", "10000");
     Config cfg = new MapConfig(config);
     // 1. Request two containers on hosts - host1 and host2
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1",
-        "1", "host2")));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(2));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"), "1", new ProcessorLocality("1", "host2"))));
+    ContainerManager containerManager = buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager);
 
     MockContainerAllocatorWithHostAffinity allocator = new MockContainerAllocatorWithHostAffinity(
         clusterResourceManager,
@@ -694,7 +701,7 @@ public class TestContainerProcessManager {
         containerManager);
 
     ContainerProcessManager cpm =
-        spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator)));
+        spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator), mockLocalityManager));
 
     cpm.start();
     assertFalse(cpm.shouldShutdown());
@@ -744,12 +751,12 @@ public class TestContainerProcessManager {
 
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfig());
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(new MapConfig(conf)));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
@@ -819,12 +826,12 @@ public class TestContainerProcessManager {
 
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfig());
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(new MapConfig(config)));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
@@ -915,10 +922,32 @@ public class TestContainerProcessManager {
     server.stop();
   }
 
+  private ContainerManager buildContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
+      SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
+      boolean hostAffinityEnabled, boolean standByEnabled) {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+    return buildContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager,
+        hostAffinityEnabled, standByEnabled, mockLocalityManager);
+  }
+
+  private ContainerManager buildContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
+      SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
+      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {
+    return new ContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager,
+        hostAffinityEnabled, standByEnabled, localityManager);
+  }
   private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator) {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+    return buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, allocator, mockLocalityManager);
+  }
+
+  private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
+      ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator, LocalityManager localityManager) {
     return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager,
-        allocator, new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        clusterManagerConfig.getHostAffinityEnabled(), false));
+        allocator, buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        clusterManagerConfig.getHostAffinityEnabled(), false, localityManager), localityManager);
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
index c5f3ec1..cb3a7a7 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
@@ -22,10 +22,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import org.apache.samza.Partition;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
@@ -40,7 +38,7 @@ public class TestStandbyAllocator {
 
   @Test
   public void testWithNoStandby() {
-    JobModel jobModel = getJobModelWithStandby(1, 1, 1, Optional.empty());
+    JobModel jobModel = getJobModelWithStandby(1, 1, 1);
     List<String> containerConstraints = StandbyTaskUtil.getStandbyContainerConstraints("0", jobModel);
     Assert.assertEquals("Constrained container count should be 0", 0, containerConstraints.size());
   }
@@ -59,7 +57,7 @@ public class TestStandbyAllocator {
 
 
   public void testWithStandby(int nContainers, int nTasks, int replicationFactor) {
-    JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, replicationFactor, Optional.empty());
+    JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, replicationFactor);
 
     for (String containerID : jobModel.getContainers().keySet()) {
       List<String> containerConstraints = StandbyTaskUtil.getStandbyContainerConstraints(containerID, jobModel);
@@ -81,7 +79,7 @@ public class TestStandbyAllocator {
   }
 
   // Helper method to create a jobmodel with given number of containers, tasks and replication factor
-  public static JobModel getJobModelWithStandby(int nContainers, int nTasks, int replicationFactor, Optional<LocalityManager> localityManager) {
+  public static JobModel getJobModelWithStandby(int nContainers, int nTasks, int replicationFactor) {
     Map<String, ContainerModel> containerModels = new HashMap<>();
     int taskID = 0;
 
@@ -104,7 +102,7 @@ public class TestStandbyAllocator {
     }
 
     containerModels.putAll(standbyContainerModels);
-    return new JobModel(new MapConfig(), containerModels, localityManager.orElse(null));
+    return new JobModel(new MapConfig(), containerModels);
   }
 
   // Helper method that creates a taskmodel with one input ssp
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
index d18ad67..f2819c7 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
@@ -34,7 +34,10 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 
 public class TestLocalityManager {
 
@@ -59,7 +62,7 @@ public class TestLocalityManager {
     LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE));
 
     localityManager.writeContainerToHostMapping("0", "localhost");
-    Map<String, Map<String, String>> localMap = localityManager.readContainerLocality();
+    Map<String, Map<String, String>> localMap = readContainerLocality(localityManager);
     Map<String, Map<String, String>> expectedMap =
       new HashMap<String, Map<String, String>>() {
         {
@@ -87,9 +90,9 @@ public class TestLocalityManager {
 
     localityManager.writeContainerToHostMapping("1", "localhost");
 
-    assertEquals(localityManager.readContainerLocality().size(), 1);
+    assertEquals(readContainerLocality(localityManager).size(), 1);
 
-    assertEquals(ImmutableMap.of("1", ImmutableMap.of("host", "localhost")), localityManager.readContainerLocality());
+    assertEquals(ImmutableMap.of("1", ImmutableMap.of("host", "localhost")), readContainerLocality(localityManager));
 
     localityManager.close();
 
@@ -98,4 +101,13 @@ public class TestLocalityManager {
     assertTrue(producer.isStopped());
     assertTrue(consumer.isStopped());
   }
+
+  static Map<String, Map<String, String>> readContainerLocality(LocalityManager localityManager) {
+    Map<String, Map<String, String>> containerLocalityMap = new HashMap<>();
+    localityManager.readLocality().getProcessorLocalities().forEach((containerId, containerLocality) -> {
+      containerLocalityMap.put(containerId, ImmutableMap.of("host", containerLocality.host()));
+    });
+
+    return containerLocalityMap;
+  }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
index 6e29d81..3affa71 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
@@ -22,13 +22,9 @@ package org.apache.samza.coordinator;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.runtime.LocationId;
-import org.apache.samza.system.StreamMetadataCache;
 
 /**
  * Utils to create instances of {@link JobModelManager} in unit tests
@@ -36,23 +32,12 @@ import org.apache.samza.system.StreamMetadataCache;
 public class JobModelManagerTestUtil {
 
   public static JobModelManager getJobModelManager(Config config, int containerCount, HttpServer server) {
-    return getJobModelManagerWithLocalityManager(config, containerCount, null, server);
-  }
-
-  public static JobModelManager getJobModelManagerWithLocalityManager(Config config, int containerCount, LocalityManager localityManager, HttpServer server) {
     Map<String, ContainerModel> containers = new java.util.HashMap<>();
     for (int i = 0; i < containerCount; i++) {
       ContainerModel container = new ContainerModel(String.valueOf(i), new HashMap<>());
       containers.put(String.valueOf(i), container);
     }
-    JobModel jobModel = new JobModel(config, containers, localityManager);
-    return new JobModelManager(jobModel, server, null);
-  }
-
-  public static JobModelManager getJobModelManagerUsingReadModel(Config config, StreamMetadataCache streamMetadataCache,
-      HttpServer server, LocalityManager localityManager, Map<String, LocationId> processorLocality) {
-    JobModel jobModel = JobModelManager.readJobModel(config, new HashMap<>(), streamMetadataCache,
-        new GrouperMetadataImpl(processorLocality, new HashMap<>(), new HashMap<>(), new HashMap<>()));
-    return new JobModelManager(new JobModel(jobModel.getConfig(), jobModel.getContainers(), localityManager), server, localityManager);
+    JobModel jobModel = new JobModel(config, containers);
+    return new JobModelManager(jobModel, server);
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index 9908da5..83de0cf 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -39,9 +39,10 @@ import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
 import org.apache.samza.container.grouper.task.TaskAssignmentManager;
 import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
 import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.runtime.LocationId;
@@ -57,10 +58,12 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentMatcher;
@@ -104,72 +107,6 @@ public class TestJobModelManager {
   }
 
   @Test
-  public void testLocalityMapWithHostAffinity() {
-    Config config = new MapConfig(new HashMap<String, String>() {
-      {
-        put("cluster-manager.container.count", "1");
-        put("cluster-manager.container.memory.mb", "512");
-        put("cluster-manager.container.retry.count", "1");
-        put("cluster-manager.container.retry.window.ms", "1999999999");
-        put("cluster-manager.allocator.sleep.ms", "10");
-        put("yarn.package.path", "/foo");
-        put("task.inputs", "test-system.test-stream");
-        put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
-        put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
-        put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
-        put("job.host-affinity.enabled", "true");
-      }
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-
-    localityMappings.put("0", new HashMap<String, String>() { {
-        put(SetContainerHostMapping.HOST_KEY, "abc-affinity");
-      } });
-    when(mockLocalityManager.readContainerLocality()).thenReturn(this.localityMappings);
-
-    Map<String, LocationId> containerLocality = ImmutableMap.of("0", new LocationId("abc-affinity"));
-    this.jobModelManager =
-        JobModelManagerTestUtil.getJobModelManagerUsingReadModel(config, mockStreamMetadataCache, server,
-            mockLocalityManager, containerLocality);
-
-    assertEquals(jobModelManager.jobModel().getAllContainerLocality(), ImmutableMap.of("0", "abc-affinity"));
-  }
-
-  @Test
-  public void testLocalityMapWithoutHostAffinity() {
-    Config config = new MapConfig(new HashMap<String, String>() {
-      {
-        put("cluster-manager.container.count", "1");
-        put("cluster-manager.container.memory.mb", "512");
-        put("cluster-manager.container.retry.count", "1");
-        put("cluster-manager.container.retry.window.ms", "1999999999");
-        put("cluster-manager.allocator.sleep.ms", "10");
-        put("yarn.package.path", "/foo");
-        put("task.inputs", "test-system.test-stream");
-        put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
-        put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
-        put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
-        put("job.host-affinity.enabled", "false");
-      }
-    });
-
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-
-    localityMappings.put("0", new HashMap<String, String>() { {
-        put(SetContainerHostMapping.HOST_KEY, "abc-affinity");
-      } });
-    when(mockLocalityManager.readContainerLocality()).thenReturn(new HashMap<>());
-
-    Map<String, LocationId> containerLocality = ImmutableMap.of("0", new LocationId("abc-affinity"));
-
-    this.jobModelManager =
-        JobModelManagerTestUtil.getJobModelManagerUsingReadModel(config, mockStreamMetadataCache, server,
-            mockLocalityManager, containerLocality);
-
-    assertEquals(jobModelManager.jobModel().getAllContainerLocality(), Collections.singletonMap("0", null));
-  }
-
-  @Test
   public void testGetGrouperMetadata() {
     // Mocking setup.
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
@@ -179,17 +116,13 @@ public class TestJobModelManager {
     SystemStreamPartition testSystemStreamPartition1 = new SystemStreamPartition(new SystemStream("test-system-0", "test-stream-0"), new Partition(1));
     SystemStreamPartition testSystemStreamPartition2 = new SystemStreamPartition(new SystemStream("test-system-1", "test-stream-1"), new Partition(2));
 
-    Map<String, Map<String, String>> localityMappings = new HashMap<>();
-    localityMappings.put("0", ImmutableMap.of(SetContainerHostMapping.HOST_KEY, "abc-affinity"));
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "abc-affinity"))));
 
     Map<SystemStreamPartition, List<String>> taskToSSPAssignments = ImmutableMap.of(testSystemStreamPartition1, ImmutableList.of("task-0", "task-1"),
                                                                                     testSystemStreamPartition2, ImmutableList.of("task-2", "task-3"));
 
     Map<String, String> taskAssignment = ImmutableMap.of("task-0", "0");
 
-    // Mock the container locality assignment.
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMappings);
-
     // Mock the task to partition assignment.
     when(mockTaskPartitionAssignmentManager.readTaskPartitionAssignments()).thenReturn(taskToSSPAssignments);
 
@@ -199,8 +132,8 @@ public class TestJobModelManager {
 
     GrouperMetadataImpl grouperMetadata = JobModelManager.getGrouperMetadata(new MapConfig(), mockLocalityManager, mockTaskAssignmentManager, mockTaskPartitionAssignmentManager);
 
-    Mockito.verify(mockLocalityManager).readContainerLocality();
-    Mockito.verify(mockTaskAssignmentManager).readTaskAssignment();
+    verify(mockLocalityManager).readLocality();
+    verify(mockTaskAssignmentManager).readTaskAssignment();
 
     Assert.assertEquals(ImmutableMap.of("0", new LocationId("abc-affinity")), grouperMetadata.getProcessorLocality());
     Assert.assertEquals(ImmutableMap.of(new TaskName("task-0"), new LocationId("abc-affinity")), grouperMetadata.getTaskLocality());
@@ -216,15 +149,14 @@ public class TestJobModelManager {
   public void testGetProcessorLocalityAllEntriesExisting() {
     Config config = new MapConfig(ImmutableMap.of(JobConfig.JOB_CONTAINER_COUNT, "2"));
 
-    Map<String, Map<String, String>> localityMappings = new HashMap<>();
-    localityMappings.put("0", ImmutableMap.of(SetContainerHostMapping.HOST_KEY, "0-affinity"));
-    localityMappings.put("1", ImmutableMap.of(SetContainerHostMapping.HOST_KEY, "1-affinity"));
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMappings);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(ImmutableMap.of(
+        "0", new ProcessorLocality("0", "0-affinity"),
+        "1", new ProcessorLocality("1", "1-affinity"))));
 
     Map<String, LocationId> processorLocality = JobModelManager.getProcessorLocality(config, mockLocalityManager);
 
-    Mockito.verify(mockLocalityManager).readContainerLocality();
+    verify(mockLocalityManager).readLocality();
     ImmutableMap<String, LocationId> expected =
         ImmutableMap.of("0", new LocationId("0-affinity"), "1", new LocationId("1-affinity"));
     Assert.assertEquals(expected, processorLocality);
@@ -234,15 +166,13 @@ public class TestJobModelManager {
   public void testGetProcessorLocalityNewContainer() {
     Config config = new MapConfig(ImmutableMap.of(JobConfig.JOB_CONTAINER_COUNT, "2"));
 
-    Map<String, Map<String, String>> localityMappings = new HashMap<>();
-    // 2 containers, but only return 1 existing mapping
-    localityMappings.put("0", ImmutableMap.of(SetContainerHostMapping.HOST_KEY, "abc-affinity"));
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMappings);
+    // 2 containers, but only return 1 existing mapping
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "abc-affinity"))));
 
     Map<String, LocationId> processorLocality = JobModelManager.getProcessorLocality(config, mockLocalityManager);
 
-    Mockito.verify(mockLocalityManager).readContainerLocality();
+    verify(mockLocalityManager).readLocality();
     ImmutableMap<String, LocationId> expected = ImmutableMap.of(
         // found entry in existing locality
         "0", new LocationId("abc-affinity"),
@@ -291,16 +221,16 @@ public class TestJobModelManager {
     systemStreamPartitions.add(new SystemStreamPartition(new SystemStream("test-system-3", "test-stream-3"), new Partition(2)));
 
     // Verifications
-    Mockito.verify(mockJobModel, atLeast(1)).getContainers();
-    Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any());
-    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMappings(ImmutableMap.of("test-container-id",
+    verify(mockJobModel, atLeast(1)).getContainers();
+    verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any());
+    verify(mockTaskAssignmentManager).writeTaskContainerMappings(ImmutableMap.of("test-container-id",
         ImmutableMap.of("task-1", TaskMode.Active, "task-2", TaskMode.Active, "task-3", TaskMode.Active, "task-4", TaskMode.Active)));
 
     // Verify that the old, stale partition mappings had been purged in the coordinator stream.
-    Mockito.verify(mockTaskPartitionAssignmentManager).delete(systemStreamPartitions);
+    verify(mockTaskPartitionAssignmentManager).delete(systemStreamPartitions);
 
     // Verify that the new task to partition assignment is stored in the coordinator stream.
-    Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignments(ImmutableMap.of(
+    verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignments(ImmutableMap.of(
         testSystemStreamPartition1, ImmutableList.of("task-1"),
         testSystemStreamPartition2, ImmutableList.of("task-2"),
         testSystemStreamPartition3, ImmutableList.of("task-3"),
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index ea57479..104b0ba 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -442,7 +442,7 @@ public class TestOperatorImplGraph {
     cms.put(cm0.getId(), cm0);
     cms.put(cm1.getId(), cm1);
 
-    JobModel jobModel = new JobModel(config, cms, null);
+    JobModel jobModel = new JobModel(config, cms);
     Multimap<SystemStream, String> streamToTasks = OperatorImplGraph.getStreamToConsumerTasks(jobModel);
     assertEquals(streamToTasks.get(ssp0.getSystemStream()).size(), 2);
     assertEquals(streamToTasks.get(ssp2.getSystemStream()).size(), 1);
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
index b86da1f..c13da8d 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -26,6 +26,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -38,9 +39,9 @@ import org.apache.samza.container.grouper.task.TaskAssignmentManager;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.rest.model.Task;
 import org.apache.samza.rest.proxy.installation.InstallationFinder;
@@ -137,7 +138,7 @@ public class SamzaTaskProxy implements TaskProxy {
   protected List<Task> readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
     CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(consumer.getConfig(), new MetricsRegistryMap());
     LocalityManager localityManager = new LocalityManager(coordinatorStreamStore);
-    Map<String, Map<String, String>> containerIdToHostMapping = localityManager.readContainerLocality();
+    Map<String, ProcessorLocality> containerLocalities = localityManager.readLocality().getProcessorLocalities();
     TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskModeMapping.TYPE));
     Map<String, String> taskNameToContainerIdMapping = taskAssignmentManager.readTaskAssignment();
     StorageConfig storageConfig = new StorageConfig(consumer.getConfig());
@@ -145,7 +146,9 @@ public class SamzaTaskProxy implements TaskProxy {
     return taskNameToContainerIdMapping.entrySet()
         .stream()
         .map(entry -> {
-          String hostName = containerIdToHostMapping.get(entry.getValue()).get(SetContainerHostMapping.HOST_KEY);
+          String hostName = Optional.ofNullable(containerLocalities.get(entry.getValue()))
+              .map(ProcessorLocality::host)
+              .orElse(null);
           return new Task(hostName, entry.getKey(), entry.getValue(), new ArrayList<>(), storeNames);
         }).collect(Collectors.toList());
   }
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 46f345d..3f7056e 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -142,7 +142,7 @@ object TestKeyValuePerformance extends Logging {
           new TaskInstanceCollector(producerMultiplexer),
           new MetricsRegistryMap,
           null,
-          JobContextImpl.fromConfigWithDefaults(storageConfig),
+          JobContextImpl.fromConfigWithDefaults(storageConfig, null),
           new ContainerContextImpl(new ContainerModel("0", tasks.asJava), new MetricsRegistryMap), StoreMode.ReadWrite
         )
 
diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index 2b31977..74fef67 100644
--- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -19,10 +19,10 @@
 
 package org.apache.samza.validation;
 
-import java.util.Map;
 import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -35,17 +35,18 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.job.yarn.ClientHelper;
 import org.apache.samza.metrics.JmxMetricsAccessor;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsValidator;
-import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.util.CommandLine;
-import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.ReflectionUtil;
 import org.apache.samza.util.hadoop.HttpFileSystem;
 import org.slf4j.Logger;
@@ -158,23 +159,24 @@ public class YarnJobValidationTool {
     CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(config, metricsRegistry);
     coordinatorStreamStore.init();
     try {
-      Config configFromCoordinatorStream = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
-      ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamStore);
-      JobModelManager jobModelManager =
-          JobModelManager.apply(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(),
-              coordinatorStreamStore, metricsRegistry);
+      LocalityManager localityManager =
+          new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE));
       validator.init(config);
-      Map<String, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
-      for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {
-        String containerId = entry.getKey();
-        String jmxUrl = entry.getValue();
-        log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
-        JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
-        jmxMetrics.connect();
-        validator.validate(jmxMetrics);
-        jmxMetrics.close();
-        log.info("validate container " + containerId + " successfully");
+      LocalityModel localityModel = localityManager.readLocality();
+
+      for (ProcessorLocality processorLocality : localityModel.getProcessorLocalities().values()) {
+        String containerId = processorLocality.id();
+        String jmxUrl = processorLocality.jmxTunnelingUrl();
+        if (StringUtils.isNotBlank(jmxUrl)) {
+          log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
+          JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
+          jmxMetrics.connect();
+          validator.validate(jmxMetrics);
+          jmxMetrics.close();
+          log.info("validate container " + containerId + " successfully");
+        }
       }
+
       validator.complete();
     } finally {
       coordinatorStreamStore.close();
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index 7fe6305..d01b20f 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -137,8 +137,7 @@
               %td
                 Up time: #{container.upTimeStr()}
               %td
-                Ordinary: #{samzaAppState.jobModelManager.jobModel.getContainerToHostValue(processorId, org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_URL_KEY)}
-                Tunneling: #{samzaAppState.jobModelManager.jobModel.getContainerToHostValue(processorId, org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_TUNNELING_URL_KEY)}
+                %a(target="_blank" href="#{state.coordinatorUrl.toString}locality?processorId=#{processorId.toString}") JMX
 
       %h2 Failed Containers
       %table.table.table-striped.table-bordered.tablesorter#containers-table
diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestLocalityServlet.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestLocalityServlet.java
new file mode 100644
index 0000000..56d7ae1
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestLocalityServlet.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.webapp;
+
+import com.google.common.collect.ImmutableMap;
+import java.net.URL;
+import java.util.Collections;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.coordinator.server.LocalityServlet;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.util.ExponentialSleepStrategy;
+import org.apache.samza.util.HttpUtil;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * A test class for {@link LocalityServlet}. It validates the servlet directly and Serde Mix-In of {@link ProcessorLocality}
+ * indirectly.
+ */
+public class TestLocalityServlet {
+  private static final String PROCESSOR_ID1 = "1";
+  private static final String PROCESSOR_ID2 = "2";
+  private static final String HOST1 = "host1";
+  private static final String HOST2 = "host2";
+  private static final String JMX_URL = "jmx";
+  private static final String TUNNELING_URL = "tunneling";
+
+  private static final ProcessorLocality PROCESSOR_1_LOCALITY =
+      new ProcessorLocality(PROCESSOR_ID1, HOST1, JMX_URL, TUNNELING_URL);
+  private static final ProcessorLocality PROCESSOR_2_LOCALITY =
+      new ProcessorLocality("2", HOST2, JMX_URL, TUNNELING_URL);
+
+  private final ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+  private HttpServer webApp;
+  private LocalityManager localityManager;
+
+
+
+  @Before
+  public void setup()
+      throws Exception {
+    localityManager = mock(LocalityManager.class);
+    when(localityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of(PROCESSOR_ID1, PROCESSOR_1_LOCALITY, PROCESSOR_ID2, PROCESSOR_2_LOCALITY)));
+    webApp = new HttpServer("/", 0, "", new ServletHolder(new DefaultServlet()));
+    webApp.addServlet("/locality", new LocalityServlet(localityManager));
+    webApp.start();
+  }
+
+  @After
+  public void cleanup()
+      throws Exception {
+    webApp.stop();
+  }
+
+  @Test
+  public void testReadContainerLocality() throws Exception {
+    URL url = new URL(webApp.getUrl().toString() + "locality");
+
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
+    LocalityModel locality = mapper.readValue(response, LocalityModel.class);
+
+    assertEquals("Expected locality for two containers", 2, locality.getProcessorLocalities().size());
+    assertEquals("Mismatch in locality for processor " + PROCESSOR_ID1,
+        locality.getProcessorLocality(PROCESSOR_ID1), PROCESSOR_1_LOCALITY);
+    assertEquals("Mismatch in locality for processor " + PROCESSOR_ID2,
+        locality.getProcessorLocality(PROCESSOR_ID2), PROCESSOR_2_LOCALITY);
+  }
+
+  @Test
+  public void testReadContainerLocalityWithNoLocality() throws Exception {
+    final LocalityModel expectedLocality = new LocalityModel(Collections.emptyMap());
+    URL url = new URL(webApp.getUrl().toString() + "locality");
+    when(localityManager.readLocality()).thenReturn(new LocalityModel(ImmutableMap.of()));
+
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
+    LocalityModel locality = mapper.readValue(response, LocalityModel.class);
+
+    assertEquals("Expected empty response but got " + locality, locality, expectedLocality);
+  }
+
+  @Test
+  public void testReadProcessorLocality() throws Exception {
+    URL url = new URL(webApp.getUrl().toString() + "locality?processorId=" + PROCESSOR_ID1);
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
+
+    assertEquals("Mismatch in the locality for processor " + PROCESSOR_ID1,
+        mapper.readValue(response, ProcessorLocality.class), PROCESSOR_1_LOCALITY);
+  }
+
+  @Test
+  public void testReadProcessorLocalityWithNoLocality() throws Exception {
+    final ProcessorLocality expectedProcessorLocality = new ProcessorLocality(PROCESSOR_ID2, "");
+    URL url = new URL(webApp.getUrl().toString() + "locality?processorId=" + PROCESSOR_ID2);
+    when(localityManager.readLocality()).thenReturn(new LocalityModel(ImmutableMap.of()));
+
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
+    ProcessorLocality processorLocality = mapper.readValue(response, ProcessorLocality.class);
+
+    assertEquals("Expected empty response for processor locality " + PROCESSOR_ID2 + " but got " + processorLocality,
+        processorLocality, expectedProcessorLocality);
+  }
+}