You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/06/02 23:14:38 UTC
samza git commit: SAMZA-618 - Add container-to-host mapping messages
to config stream
Repository: samza
Updated Branches:
refs/heads/master f77595804 -> f320a4306
SAMZA-618 - Add container-to-host mapping messages to config stream
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f320a430
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f320a430
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f320a430
Branch: refs/heads/master
Commit: f320a430692e1b168106f721b4f73f6cfa90e2e1
Parents: f775958
Author: Navina <na...@gmail.com>
Authored: Tue Jun 2 14:13:56 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Tue Jun 2 14:13:56 2015 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 2 +-
.../apache/samza/container/LocalityManager.java | 90 ++++++++++++++++++++
.../stream/CoordinatorStreamMessage.java | 64 +++++++++++---
.../org/apache/samza/job/model/JobModel.java | 7 ++
.../apache/samza/container/SamzaContainer.scala | 35 +++++++-
.../samza/coordinator/JobCoordinator.scala | 56 ++++++++----
.../samza/container/TestSamzaContainer.scala | 12 +--
7 files changed, 229 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 5f8e103..3374f0c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -99,7 +99,7 @@
<subpackage name="container">
<allow pkg="org.apache.samza.config" />
-
+ <allow pkg="org.apache.samza.coordinator.stream" />
<subpackage name="grouper">
<subpackage name="stream">
<allow pkg="org.apache.samza.container" />
http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
new file mode 100644
index 0000000..e661e12
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping;
+
+/**
+ * Locality Manager is used to persist and read the container-to-host
+ * assignment information from the coordinator stream
+ * */
+public class LocalityManager {
+ private static final Logger log = LoggerFactory.getLogger(LocalityManager.class);
+ private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+ private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+ private static final String SOURCE = "SamzaContainer-";
+ private Map<Integer, String> containerToHostMapping;
+
+ public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+ CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
+ this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+ this.coordinatorStreamProducer = coordinatorStreamProducer;
+ this.containerToHostMapping = new HashMap<Integer, String>();
+ }
+
+ public void start() {
+ coordinatorStreamProducer.start();
+ coordinatorStreamConsumer.start();
+ }
+
+ public void stop() {
+ coordinatorStreamConsumer.stop();
+ coordinatorStreamProducer.stop();
+ }
+
+ /*
+ * Register with source suffix that is containerId
+ * */
+ public void register(String sourceSuffix) {
+ coordinatorStreamConsumer.register();
+ coordinatorStreamProducer.register(LocalityManager.SOURCE + sourceSuffix);
+ }
+
+ public Map<Integer, String> readContainerLocality() {
+ Map<Integer, String> allMappings = new HashMap<Integer, String>();
+ for (CoordinatorStreamMessage message: coordinatorStreamConsumer.getBootstrappedStream(SetContainerHostMapping.TYPE)) {
+ SetContainerHostMapping mapping = new SetContainerHostMapping(message);
+ allMappings.put(Integer.parseInt(mapping.getKey()), mapping.getHostLocality());
+ }
+ containerToHostMapping = Collections.unmodifiableMap(allMappings);
+ return allMappings;
+ }
+
+
+ public void writeContainerToHostMapping(Integer containerId, String hostHttpAddress) {
+ String existingMapping = containerToHostMapping.get(containerId);
+ if (existingMapping != null && !existingMapping.equals(hostHttpAddress)) {
+ log.info("Container {} moved from {} to {}", new Object[]{containerId, existingMapping, hostHttpAddress});
+ } else {
+ log.info("Container {} started at {}", containerId, hostHttpAddress);
+ }
+ coordinatorStreamProducer.send(new SetContainerHostMapping(SOURCE + containerId, String.valueOf(containerId), hostHttpAddress));
+ containerToHostMapping.put(containerId, hostHttpAddress);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
index 0988ded..6c1e488 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -41,14 +41,14 @@ import org.slf4j.LoggerFactory;
* pre-defined fields (such as timestamp, host, etc) for the value map, which
* are common to all messages.
* </p>
- *
+ *
* <p>
* The full structure for a CoordinatorStreamMessage is:
* </p>
- *
+ *
* <pre>
* key => [1, "set-config", "job.name"]
- *
+ *
* message => {
* "host": "192.168.0.1",
* "username": "criccomini",
@@ -56,16 +56,16 @@ import org.slf4j.LoggerFactory;
* "timestamp": 123456789,
* "values": {
* "value": "my-job-name"
- * }
+ * }
* }
* </pre>
- *
+ *
* Where the key's structure is:
- *
+ *
* <pre>
* key => [<version>, <type>, <key>]
* </pre>
- *
+ *
* <p>
* Note that the white space in the above JSON blobs are done for legibility.
* Over the wire, the JSON should be compact, and no unnecessary white space
@@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory;
* be evaluated as two different keys, and Kafka will not log compact them (if
* Kafka is used as the underlying system for a coordinator stream).
* </p>
- *
+ *
* <p>
* The "values" map in the message is defined on a per-message-type basis. For
* set-config messages, there is just a single key/value pair, where the "value"
@@ -82,7 +82,7 @@ import org.slf4j.LoggerFactory;
* in "values" (one for each SystemStreamPartition/offset pair for a given
* TaskName).
* </p>
- *
+ *
* <p>
* The most important fields are type, key, and values. The type field (defined
* as index 1 in the key list) defines the kind of message, the key (defined as
@@ -213,7 +213,7 @@ public class CoordinatorStreamMessage {
* The type of the message is used to convert a generic
* CoordinatorStreaMessage into a specific message, such as a SetConfig
* message.
- *
+ *
* @return The type of the message.
*/
public String getType() {
@@ -356,17 +356,17 @@ public class CoordinatorStreamMessage {
* Considering Kafka's log compaction, for example, the keys of a message
* and its delete key must match exactly:
* </p>
- *
+ *
* <pre>
* k=>[1,"job.name","set-config"] .. v=> {..some stuff..}
* v=>[1,"job.name","set-config"] .. v=> null
* </pre>
- *
+ *
* <p>
* Deletes are modeled as a CoordinatorStreamMessage with a null message
* map, and a key that's identical to the key map that's to be deleted.
* </p>
- *
+ *
* @param source
* The source ID of the sender of the delete message.
* @param key
@@ -473,4 +473,42 @@ public class CoordinatorStreamMessage {
return Integer.parseInt(getMessageValue("Partition"));
}
}
+
+ /**
+ * SetContainerHostMapping is used internally by the samza framework to
+ * persist the container-to-host mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ * Key: $ContainerId
+ * Type: set-container-host-assignment
+ * Source: "SamzaContainer-$ContainerId"
+ * MessageMap:
+ * {
+ * ip: InetAddressString
+ * }
+ * }
+ * */
+ public static class SetContainerHostMapping extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-container-host-assignment";
+ private static final String IP_KEY = "ip";
+
+ public SetContainerHostMapping(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ public SetContainerHostMapping(String source, String key, String hostHttpAddress) {
+ super(source);
+ setType(TYPE);
+ setKey(key);
+ putMessageValue(IP_KEY, hostHttpAddress);
+
+ }
+
+ public String getHostLocality() {
+ return getMessageValue(IP_KEY);
+
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index fa113e1..95a2ce5 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -20,6 +20,7 @@
package org.apache.samza.job.model;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import org.apache.samza.config.Config;
@@ -38,12 +39,18 @@ import org.apache.samza.config.Config;
public class JobModel {
private final Config config;
private final Map<Integer, ContainerModel> containers;
+ private final Map<Integer, String> containerToHostMapping;
public int maxChangeLogStreamPartitions;
public JobModel(Config config, Map<Integer, ContainerModel> containers) {
+ this(config, containers, new HashMap<Integer, String>());
+ }
+
+ public JobModel(Config config, Map<Integer, ContainerModel> containers, Map<Integer, String> containerToHostMapping) {
this.config = config;
this.containers = Collections.unmodifiableMap(containers);
+ this.containerToHostMapping = Collections.unmodifiableMap(containerToHostMapping);
// Compute the number of change log stream partitions as the maximum partition-id
// of all total number of tasks of the job; Increment by 1 because partition ids
http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 50e53fb..cbacd18 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -56,7 +56,7 @@ import org.apache.samza.task.TaskInstanceCollector
import org.apache.samza.util.Logging
import org.apache.samza.util.Util
import scala.collection.JavaConversions._
-import java.net.URL
+import java.net.{UnknownHostException, InetAddress, URL}
import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.config.JobConfig.Config2Job
@@ -331,6 +331,7 @@ object SamzaContainer extends Logging {
val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry)
val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry)
val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, String.valueOf(containerId))
+ val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
info("Got checkpoint manager: %s" format checkpointManager)
@@ -531,11 +532,13 @@ object SamzaContainer extends Logging {
info("Samza container setup complete.")
new SamzaContainer(
+ containerContext = containerContext,
taskInstances = taskInstances,
runLoop = runLoop,
consumerMultiplexer = consumerMultiplexer,
producerMultiplexer = producerMultiplexer,
offsetManager = offsetManager,
+ localityManager = localityManager,
metrics = samzaContainerMetrics,
reporters = reporters,
jvm = jvm)
@@ -543,12 +546,14 @@ object SamzaContainer extends Logging {
}
class SamzaContainer(
+ containerContext: SamzaContainerContext,
taskInstances: Map[TaskName, TaskInstance],
runLoop: RunLoop,
consumerMultiplexer: SystemConsumers,
producerMultiplexer: SystemProducers,
metrics: SamzaContainerMetrics,
offsetManager: OffsetManager = new OffsetManager,
+ localityManager: LocalityManager = null,
reporters: Map[String, MetricsReporter] = Map(),
jvm: JvmMetrics = null) extends Runnable with Logging {
@@ -558,6 +563,7 @@ class SamzaContainer(
startMetrics
startOffsetManager
+ startLocalityManager
startStores
startProducers
startTask
@@ -576,6 +582,7 @@ class SamzaContainer(
shutdownTask
shutdownProducers
shutdownStores
+ shutdownLocalityManager
shutdownOffsetManager
shutdownMetrics
@@ -612,6 +619,25 @@ class SamzaContainer(
offsetManager.start
}
+ def startLocalityManager {
+ if(localityManager != null) {
+ info("Registering localityManager for the container")
+ localityManager.start
+ localityManager.register(String.valueOf(containerContext.id))
+
+ info("Writing container locality to Coordinator Stream")
+ try {
+ val hostInetAddress = InetAddress.getLocalHost.getHostAddress
+ localityManager.writeContainerToHostMapping(containerContext.id, hostInetAddress)
+ } catch {
+ case uhe: UnknownHostException =>
+ warn("Received UnknownHostException when persisting locality info for container %d: %s" format (containerContext.id, uhe.getMessage)) //No-op
+ case unknownException: Throwable =>
+ warn("Received an exception when persisting locality info for container %d: %s" format (containerContext.id, unknownException.getMessage))
+ }
+ }
+ }
+
def startStores {
info("Starting task instance stores.")
@@ -668,6 +694,13 @@ class SamzaContainer(
taskInstances.values.foreach(_.shutdownStores)
}
+ def shutdownLocalityManager {
+ if(localityManager != null) {
+ info("Shutting down locality manager.")
+ localityManager.stop
+ }
+ }
+
def shutdownOffsetManager {
info("Shutting down offset manager.")
http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 5b43b58..8ee034a 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -26,7 +26,7 @@ import org.apache.samza.SamzaException
import org.apache.samza.container.grouper.task.GroupByContainerCount
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
import java.util
-import org.apache.samza.container.TaskName
+import org.apache.samza.container.{LocalityManager, TaskName}
import org.apache.samza.storage.ChangelogPartitionManager
import org.apache.samza.util.Logging
import org.apache.samza.metrics.MetricsRegistryMap
@@ -73,7 +73,8 @@ object JobCoordinator extends Logging {
info("Got config: %s" format config)
val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
- getJobCoordinator(rewriteConfig(config), checkpointManager, changelogManager)
+ val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
+ getJobCoordinator(rewriteConfig(config), checkpointManager, changelogManager, localityManager)
}
def apply(coordinatorSystemConfig: Config): JobCoordinator = apply(coordinatorSystemConfig, new MetricsRegistryMap())
@@ -81,9 +82,12 @@ object JobCoordinator extends Logging {
/**
* Build a JobCoordinator using a Samza job's configuration.
*/
- def getJobCoordinator(config: Config, checkpointManager: CheckpointManager, changelogManager: ChangelogPartitionManager) = {
+ def getJobCoordinator(config: Config,
+ checkpointManager: CheckpointManager,
+ changelogManager: ChangelogPartitionManager,
+ localityManager: LocalityManager) = {
val containerCount = config.getContainerCount
- val jobModelGenerator = initializeJobModel(config, containerCount, checkpointManager, changelogManager)
+ val jobModelGenerator = initializeJobModel(config, containerCount, checkpointManager, changelogManager, localityManager)
val server = new HttpServer
server.addServlet("/*", new JobServlet(jobModelGenerator))
new JobCoordinator(jobModelGenerator(), server, checkpointManager)
@@ -157,7 +161,8 @@ object JobCoordinator extends Logging {
private def initializeJobModel(config: Config,
containerCount: Int,
checkpointManager: CheckpointManager,
- changelogManager: ChangelogPartitionManager): () => JobModel = {
+ changelogManager: ChangelogPartitionManager,
+ localityManager: LocalityManager): () => JobModel = {
// TODO containerCount should go away when we generalize the job coordinator,
// and have a non-yarn-specific way of specifying container count.
@@ -168,24 +173,31 @@ object JobCoordinator extends Logging {
val groups = grouper.group(allSystemStreamPartitions)
// Initialize the ChangelogPartitionManager and the CheckpointManager
- val previousChangelogeMapping = if (changelogManager != null)
+ val previousChangelogMapping = if (changelogManager != null)
{
- changelogManager.start
- changelogManager.readChangeLogPartitionMapping
+ changelogManager.start()
+ changelogManager.readChangeLogPartitionMapping()
}
else
{
- new util.HashMap[TaskName, java.lang.Integer]()
+ new util.HashMap[TaskName, Integer]()
}
- checkpointManager.start
+
+ checkpointManager.start()
groups.foreach(taskSSP => checkpointManager.register(taskSSP._1))
+ // We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers.
+ // TODO: This code will go away with refactoring - SAMZA-678
+
+ localityManager.start()
+
// Generate the jobModel
def jobModelGenerator(): JobModel = refreshJobModel(config,
allSystemStreamPartitions,
checkpointManager,
groups,
- previousChangelogeMapping,
+ previousChangelogMapping,
+ localityManager,
containerCount)
val jobModel = jobModelGenerator()
@@ -194,14 +206,14 @@ object JobCoordinator extends Logging {
if (changelogManager != null)
{
// newChangelogMapping is the merging of all current task:changelog
- // assignments with whatever we had before (previousChangelogeMapping).
+ // assignments with whatever we had before (previousChangelogMapping).
// We must persist legacy changelog assignments so that
// maxChangelogPartitionId always has the absolute max, not the current
// max (in case the task with the highest changelog partition mapping
// disappears.
val newChangelogMapping = jobModel.getContainers.flatMap(_._2.getTasks).map{case (taskName,taskModel) => {
taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
- }}.toMap ++ previousChangelogeMapping
+ }}.toMap ++ previousChangelogMapping
info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping)
changelogManager.writeChangeLogPartitionMapping(newChangelogMapping)
}
@@ -219,13 +231,14 @@ object JobCoordinator extends Logging {
allSystemStreamPartitions: util.Set[SystemStreamPartition],
checkpointManager: CheckpointManager,
groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
- previousChangelogeMapping: util.Map[TaskName, Integer],
+ previousChangelogMapping: util.Map[TaskName, Integer],
+ localityManager: LocalityManager,
containerCount: Int): JobModel = {
this.synchronized
{
// If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
// mapping.
- var maxChangelogPartitionId = previousChangelogeMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
+ var maxChangelogPartitionId = previousChangelogMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
// Assign all SystemStreamPartitions to TaskNames.
val taskModels =
@@ -235,7 +248,7 @@ object JobCoordinator extends Logging {
val checkpoint = Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new Checkpoint(new util.HashMap[SystemStreamPartition, String]()))
// Find the system partitions which don't have a checkpoint and set null for the values for offsets
val offsetMap = systemStreamPartitions.map(ssp => (ssp -> null)).toMap ++ checkpoint.getOffsets
- val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match
+ val changelogPartition = Option(previousChangelogMapping.get(taskName)) match
{
case Some(changelogPartitionId) => new Partition(changelogPartitionId)
case _ =>
@@ -255,7 +268,16 @@ object JobCoordinator extends Logging {
val containerModels = containerGrouper.group(taskModels).map
{ case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
- new JobModel(config, containerModels)
+ val containerLocality = if(localityManager != null) {
+ localityManager.readContainerLocality()
+ } else {
+ new util.HashMap[Integer, String]()
+ }
+
+ containerLocality.foreach{case (container: Integer, location: String) =>
+ info("Container id %d --> %s" format (container.intValue(), location))
+ }
+ new JobModel(config, containerModels, containerLocality)
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f320a430/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a7fa085..9fb1aa9 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -161,11 +161,13 @@ class TestSamzaContainer extends AssertionsForJUnit {
consumerMultiplexer = consumerMultiplexer,
metrics = new SamzaContainerMetrics)
val container = new SamzaContainer(
- Map(taskName -> taskInstance),
- runLoop,
- consumerMultiplexer,
- producerMultiplexer,
- new SamzaContainerMetrics)
+ containerContext = containerContext,
+ taskInstances = Map(taskName -> taskInstance),
+ runLoop = runLoop,
+ consumerMultiplexer = consumerMultiplexer,
+ producerMultiplexer = producerMultiplexer,
+ metrics = new SamzaContainerMetrics
+ )
try {
container.run
fail("Expected exception to be thrown in run method.")