You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/06/02 23:14:38 UTC

samza git commit: SAMZA-618 - Add container-to-host mapping messages to config stream

Repository: samza
Updated Branches:
  refs/heads/master f77595804 -> f320a4306


SAMZA-618 - Add container-to-host mapping messages to config stream


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f320a430
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f320a430
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f320a430

Branch: refs/heads/master
Commit: f320a430692e1b168106f721b4f73f6cfa90e2e1
Parents: f775958
Author: Navina <na...@gmail.com>
Authored: Tue Jun 2 14:13:56 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Tue Jun 2 14:13:56 2015 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  2 +-
 .../apache/samza/container/LocalityManager.java | 90 ++++++++++++++++++++
 .../stream/CoordinatorStreamMessage.java        | 64 +++++++++++---
 .../org/apache/samza/job/model/JobModel.java    |  7 ++
 .../apache/samza/container/SamzaContainer.scala | 35 +++++++-
 .../samza/coordinator/JobCoordinator.scala      | 56 ++++++++----
 .../samza/container/TestSamzaContainer.scala    | 12 +--
 7 files changed, 229 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 5f8e103..3374f0c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -99,7 +99,7 @@
 
     <subpackage name="container">
         <allow pkg="org.apache.samza.config" />
-
+        <allow pkg="org.apache.samza.coordinator.stream" />
         <subpackage name="grouper">
             <subpackage name="stream">
                 <allow pkg="org.apache.samza.container" />

http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
new file mode 100644
index 0000000..e661e12
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping;
+
+/**
+ * Locality Manager is used to persist and read the container-to-host
+ * assignment information from the coordinator stream
+ * */
+public class LocalityManager {
+  private static final Logger log = LoggerFactory.getLogger(LocalityManager.class);
+  private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+  private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+  private static final String SOURCE = "SamzaContainer-";
+  private Map<Integer, String> containerToHostMapping;
+
+  public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+                         CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
+    this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+    this.coordinatorStreamProducer = coordinatorStreamProducer;
+    this.containerToHostMapping = new HashMap<Integer, String>();
+  }
+
+  public void start() {
+    coordinatorStreamProducer.start();
+    coordinatorStreamConsumer.start();
+  }
+
+  public void stop() {
+    coordinatorStreamConsumer.stop();
+    coordinatorStreamProducer.stop();
+  }
+
+  /*
+   * Register with source suffix that is containerId
+   * */
+  public void register(String sourceSuffix) {
+    coordinatorStreamConsumer.register();
+    coordinatorStreamProducer.register(LocalityManager.SOURCE + sourceSuffix);
+  }
+
+  public Map<Integer, String> readContainerLocality() {
+    Map<Integer, String> allMappings = new HashMap<Integer, String>();
+    for (CoordinatorStreamMessage message: coordinatorStreamConsumer.getBootstrappedStream(SetContainerHostMapping.TYPE)) {
+      SetContainerHostMapping mapping = new SetContainerHostMapping(message);
+      allMappings.put(Integer.parseInt(mapping.getKey()), mapping.getHostLocality());
+    }
+    containerToHostMapping = Collections.unmodifiableMap(allMappings);
+    return allMappings;
+  }
+
+
+  public void writeContainerToHostMapping(Integer containerId, String hostHttpAddress) {
+    String existingMapping = containerToHostMapping.get(containerId);
+    if (existingMapping != null && !existingMapping.equals(hostHttpAddress)) {
+      log.info("Container {} moved from {} to {}", new Object[]{containerId, existingMapping, hostHttpAddress});
+    } else {
+      log.info("Container {} started at {}", containerId, hostHttpAddress);
+    }
+    coordinatorStreamProducer.send(new SetContainerHostMapping(SOURCE + containerId, String.valueOf(containerId), hostHttpAddress));
+    containerToHostMapping.put(containerId, hostHttpAddress);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
index 0988ded..6c1e488 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -41,14 +41,14 @@ import org.slf4j.LoggerFactory;
  * pre-defined fields (such as timestamp, host, etc) for the value map, which
  * are common to all messages.
  * </p>
- * 
+ *
  * <p>
  * The full structure for a CoordinatorStreamMessage is:
  * </p>
- * 
+ *
  * <pre>
  * key =&gt; [1, "set-config", "job.name"] 
- * 
+ *
  * message =&gt; {
  *   "host": "192.168.0.1",
  *   "username": "criccomini",
@@ -56,16 +56,16 @@ import org.slf4j.LoggerFactory;
  *   "timestamp": 123456789,
  *   "values": {
  *     "value": "my-job-name"
- *   } 
+ *   }
  * }
  * </pre>
- * 
+ *
  * Where the key's structure is:
- * 
+ *
  * <pre>
  * key =&gt; [&lt;version&gt;, &lt;type&gt;, &lt;key&gt;]
  * </pre>
- * 
+ *
  * <p>
  * Note that the white space in the above JSON blobs are done for legibility.
  * Over the wire, the JSON should be compact, and no unnecessary white space
@@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory;
  * be evaluated as two different keys, and Kafka will not log compact them (if
  * Kafka is used as the underlying system for a coordinator stream).
  * </p>
- * 
+ *
  * <p>
  * The "values" map in the message is defined on a per-message-type basis. For
  * set-config messages, there is just a single key/value pair, where the "value"
@@ -82,7 +82,7 @@ import org.slf4j.LoggerFactory;
  * in "values" (one for each SystemStreamPartition/offset pair for a given
  * TaskName).
  * </p>
- * 
+ *
  * <p>
  * The most important fields are type, key, and values. The type field (defined
  * as index 1 in the key list) defines the kind of message, the key (defined as
@@ -213,7 +213,7 @@ public class CoordinatorStreamMessage {
    * The type of the message is used to convert a generic
    * CoordinatorStreaMessage into a specific message, such as a SetConfig
    * message.
-   * 
+   *
    * @return The type of the message.
    */
   public String getType() {
@@ -356,17 +356,17 @@ public class CoordinatorStreamMessage {
      * Considering Kafka's log compaction, for example, the keys of a message
      * and its delete key must match exactly:
      * </p>
-     * 
+     *
      * <pre>
      * k=&gt;[1,"job.name","set-config"] .. v=&gt; {..some stuff..}
      * v=&gt;[1,"job.name","set-config"] .. v=&gt; null
      * </pre>
-     * 
+     *
      * <p>
      * Deletes are modeled as a CoordinatorStreamMessage with a null message
      * map, and a key that's identical to the key map that's to be deleted.
      * </p>
-     * 
+     *
      * @param source
      *          The source ID of the sender of the delete message.
      * @param key
@@ -473,4 +473,42 @@ public class CoordinatorStreamMessage {
       return Integer.parseInt(getMessageValue("Partition"));
     }
   }
+
+  /**
+   * SetContainerHostMapping is used internally by the samza framework to
+   * persist the container-to-host mappings.
+   *
+   * Structure of the message looks like:
+   * {
+   *     Key: $ContainerId
+   *     Type: set-container-host-assignment
+   *     Source: "SamzaContainer-$ContainerId"
+   *     MessageMap:
+   *     {
+   *         ip: InetAddressString
+   *     }
+   * }
+   * */
+  public static class SetContainerHostMapping extends CoordinatorStreamMessage {
+    public static final String TYPE = "set-container-host-assignment";
+    private static final String IP_KEY = "ip";
+
+    public SetContainerHostMapping(CoordinatorStreamMessage message) {
+      super(message.getKeyArray(), message.getMessageMap());
+    }
+
+    public SetContainerHostMapping(String source, String key, String hostHttpAddress) {
+      super(source);
+      setType(TYPE);
+      setKey(key);
+      putMessageValue(IP_KEY, hostHttpAddress);
+
+    }
+
+    public String getHostLocality() {
+      return getMessageValue(IP_KEY);
+
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index fa113e1..95a2ce5 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -20,6 +20,7 @@
 package org.apache.samza.job.model;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
 
@@ -38,12 +39,18 @@ import org.apache.samza.config.Config;
 public class JobModel {
   private final Config config;
   private final Map<Integer, ContainerModel> containers;
+  private final Map<Integer, String> containerToHostMapping;
 
   public int maxChangeLogStreamPartitions;
 
   public JobModel(Config config, Map<Integer, ContainerModel> containers) {
+    this(config, containers, new HashMap<Integer, String>());
+  }
+
+  public JobModel(Config config, Map<Integer, ContainerModel> containers, Map<Integer, String> containerToHostMapping) {
     this.config = config;
     this.containers = Collections.unmodifiableMap(containers);
+    this.containerToHostMapping = Collections.unmodifiableMap(containerToHostMapping);
 
     // Compute the number of change log stream partitions as the maximum partition-id
     // of all total number of tasks of the job; Increment by 1 because partition ids

http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 50e53fb..cbacd18 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -56,7 +56,7 @@ import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
-import java.net.URL
+import java.net.{UnknownHostException, InetAddress, URL}
 import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.config.JobConfig.Config2Job
@@ -331,6 +331,7 @@ object SamzaContainer extends Logging {
     val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry)
     val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry)
     val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, String.valueOf(containerId))
+    val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
 
     info("Got checkpoint manager: %s" format checkpointManager)
 
@@ -531,11 +532,13 @@ object SamzaContainer extends Logging {
     info("Samza container setup complete.")
 
     new SamzaContainer(
+      containerContext = containerContext,
       taskInstances = taskInstances,
       runLoop = runLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       offsetManager = offsetManager,
+      localityManager = localityManager,
       metrics = samzaContainerMetrics,
       reporters = reporters,
       jvm = jvm)
@@ -543,12 +546,14 @@ object SamzaContainer extends Logging {
 }
 
 class SamzaContainer(
+  containerContext: SamzaContainerContext,
   taskInstances: Map[TaskName, TaskInstance],
   runLoop: RunLoop,
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
   metrics: SamzaContainerMetrics,
   offsetManager: OffsetManager = new OffsetManager,
+  localityManager: LocalityManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
   jvm: JvmMetrics = null) extends Runnable with Logging {
 
@@ -558,6 +563,7 @@ class SamzaContainer(
 
       startMetrics
       startOffsetManager
+      startLocalityManager
       startStores
       startProducers
       startTask
@@ -576,6 +582,7 @@ class SamzaContainer(
       shutdownTask
       shutdownProducers
       shutdownStores
+      shutdownLocalityManager
       shutdownOffsetManager
       shutdownMetrics
 
@@ -612,6 +619,25 @@ class SamzaContainer(
     offsetManager.start
   }
 
+  def startLocalityManager {
+    if(localityManager != null) {
+      info("Registering localityManager for the container")
+      localityManager.start
+      localityManager.register(String.valueOf(containerContext.id))
+
+      info("Writing container locality to Coordinator Stream")
+      try {
+        val hostInetAddress = InetAddress.getLocalHost.getHostAddress
+        localityManager.writeContainerToHostMapping(containerContext.id, hostInetAddress)
+      } catch {
+        case uhe: UnknownHostException =>
+          warn("Received UnknownHostException when persisting locality info for container %d: %s" format (containerContext.id, uhe.getMessage))  //No-op
+        case unknownException: Throwable =>
+          warn("Received an exception when persisting locality info for container %d: %s" format (containerContext.id, unknownException.getMessage))
+      }
+    }
+  }
+
   def startStores {
     info("Starting task instance stores.")
 
@@ -668,6 +694,13 @@ class SamzaContainer(
     taskInstances.values.foreach(_.shutdownStores)
   }
 
+  def shutdownLocalityManager {
+    if(localityManager != null) {
+      info("Shutting down locality manager.")
+      localityManager.stop
+    }
+  }
+
   def shutdownOffsetManager {
     info("Shutting down offset manager.")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 5b43b58..8ee034a 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -26,7 +26,7 @@ import org.apache.samza.SamzaException
 import org.apache.samza.container.grouper.task.GroupByContainerCount
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
 import java.util
-import org.apache.samza.container.TaskName
+import org.apache.samza.container.{LocalityManager, TaskName}
 import org.apache.samza.storage.ChangelogPartitionManager
 import org.apache.samza.util.Logging
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -73,7 +73,8 @@ object JobCoordinator extends Logging {
     info("Got config: %s" format config)
     val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
     val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
-    getJobCoordinator(rewriteConfig(config), checkpointManager, changelogManager)
+    val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
+    getJobCoordinator(rewriteConfig(config), checkpointManager, changelogManager, localityManager)
   }
 
   def apply(coordinatorSystemConfig: Config): JobCoordinator = apply(coordinatorSystemConfig, new MetricsRegistryMap())
@@ -81,9 +82,12 @@ object JobCoordinator extends Logging {
   /**
    * Build a JobCoordinator using a Samza job's configuration.
    */
-  def getJobCoordinator(config: Config, checkpointManager: CheckpointManager, changelogManager: ChangelogPartitionManager) = {
+  def getJobCoordinator(config: Config,
+                        checkpointManager: CheckpointManager,
+                        changelogManager: ChangelogPartitionManager,
+                        localityManager: LocalityManager) = {
     val containerCount = config.getContainerCount
-    val jobModelGenerator = initializeJobModel(config, containerCount, checkpointManager, changelogManager)
+    val jobModelGenerator = initializeJobModel(config, containerCount, checkpointManager, changelogManager, localityManager)
     val server = new HttpServer
     server.addServlet("/*", new JobServlet(jobModelGenerator))
     new JobCoordinator(jobModelGenerator(), server, checkpointManager)
@@ -157,7 +161,8 @@ object JobCoordinator extends Logging {
   private def initializeJobModel(config: Config,
                                  containerCount: Int,
                                  checkpointManager: CheckpointManager,
-                                 changelogManager: ChangelogPartitionManager): () => JobModel = {
+                                 changelogManager: ChangelogPartitionManager,
+                                 localityManager: LocalityManager): () => JobModel = {
     // TODO containerCount should go away when we generalize the job coordinator,
     // and have a non-yarn-specific way of specifying container count.
 
@@ -168,24 +173,31 @@ object JobCoordinator extends Logging {
     val groups = grouper.group(allSystemStreamPartitions)
 
     // Initialize the ChangelogPartitionManager and the CheckpointManager
-    val previousChangelogeMapping = if (changelogManager != null)
+    val previousChangelogMapping = if (changelogManager != null)
     {
-      changelogManager.start
-      changelogManager.readChangeLogPartitionMapping
+      changelogManager.start()
+      changelogManager.readChangeLogPartitionMapping()
     }
     else
     {
-      new util.HashMap[TaskName, java.lang.Integer]()
+      new util.HashMap[TaskName, Integer]()
     }
-    checkpointManager.start
+
+    checkpointManager.start()
     groups.foreach(taskSSP => checkpointManager.register(taskSSP._1))
 
+    // We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers.
+    // TODO: This code will go away with refactoring - SAMZA-678
+
+    localityManager.start()
+
     // Generate the jobModel
     def jobModelGenerator(): JobModel = refreshJobModel(config,
                                                         allSystemStreamPartitions,
                                                         checkpointManager,
                                                         groups,
-                                                        previousChangelogeMapping,
+                                                        previousChangelogMapping,
+                                                        localityManager,
                                                         containerCount)
 
     val jobModel = jobModelGenerator()
@@ -194,14 +206,14 @@ object JobCoordinator extends Logging {
     if (changelogManager != null)
     {
       // newChangelogMapping is the merging of all current task:changelog
-      // assignments with whatever we had before (previousChangelogeMapping).
+      // assignments with whatever we had before (previousChangelogMapping).
       // We must persist legacy changelog assignments so that
       // maxChangelogPartitionId always has the absolute max, not the current
       // max (in case the task with the highest changelog partition mapping
       // disappears.
       val newChangelogMapping = jobModel.getContainers.flatMap(_._2.getTasks).map{case (taskName,taskModel) => {
                                                  taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
-                                               }}.toMap ++ previousChangelogeMapping
+                                               }}.toMap ++ previousChangelogMapping
       info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping)
       changelogManager.writeChangeLogPartitionMapping(newChangelogMapping)
     }
@@ -219,13 +231,14 @@ object JobCoordinator extends Logging {
                               allSystemStreamPartitions: util.Set[SystemStreamPartition],
                               checkpointManager: CheckpointManager,
                               groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
-                              previousChangelogeMapping: util.Map[TaskName, Integer],
+                              previousChangelogMapping: util.Map[TaskName, Integer],
+                              localityManager: LocalityManager,
                               containerCount: Int): JobModel = {
     this.synchronized
     {
       // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
       // mapping.
-      var maxChangelogPartitionId = previousChangelogeMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
+      var maxChangelogPartitionId = previousChangelogMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
 
       // Assign all SystemStreamPartitions to TaskNames.
       val taskModels =
@@ -235,7 +248,7 @@ object JobCoordinator extends Logging {
                   val checkpoint = Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new Checkpoint(new util.HashMap[SystemStreamPartition, String]()))
                   // Find the system partitions which don't have a checkpoint and set null for the values for offsets
                   val offsetMap = systemStreamPartitions.map(ssp => (ssp -> null)).toMap ++ checkpoint.getOffsets
-                  val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match
+                  val changelogPartition = Option(previousChangelogMapping.get(taskName)) match
                   {
                     case Some(changelogPartitionId) => new Partition(changelogPartitionId)
                     case _ =>
@@ -255,7 +268,16 @@ object JobCoordinator extends Logging {
       val containerModels = containerGrouper.group(taskModels).map
               { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
 
-      new JobModel(config, containerModels)
+      val containerLocality = if(localityManager != null) {
+        localityManager.readContainerLocality()
+      } else {
+        new util.HashMap[Integer, String]()
+      }
+
+      containerLocality.foreach{case (container: Integer, location: String) =>
+        info("Container id %d  -->  %s" format (container.intValue(), location))
+      }
+      new JobModel(config, containerModels, containerLocality)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a7fa085..9fb1aa9 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -161,11 +161,13 @@ class TestSamzaContainer extends AssertionsForJUnit {
       consumerMultiplexer = consumerMultiplexer,
       metrics = new SamzaContainerMetrics)
     val container = new SamzaContainer(
-      Map(taskName -> taskInstance),
-      runLoop,
-      consumerMultiplexer,
-      producerMultiplexer,
-      new SamzaContainerMetrics)
+      containerContext = containerContext,
+      taskInstances = Map(taskName -> taskInstance),
+      runLoop = runLoop,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      metrics = new SamzaContainerMetrics
+    )
     try {
       container.run
       fail("Expected exception to be thrown in run method.")