You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/07/28 02:49:08 UTC

samza git commit: SAMZA-717: expose the TaskNameGrouper API

Repository: samza
Updated Branches:
  refs/heads/master f49d2c938 -> 2554669ea


SAMZA-717: expose the TaskNameGrouper API


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2554669e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2554669e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2554669e

Branch: refs/heads/master
Commit: 2554669eaf6417683f5cab4dcb12483f2a9955a1
Parents: f49d2c9
Author: József Márton Jung <j....@levi9.com>
Authored: Mon Jul 27 17:48:02 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Mon Jul 27 17:48:02 2015 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  9 ++++
 .../container/grouper/task/TaskNameGrouper.java | 53 ++++++++++++++++++++
 .../grouper/task/TaskNameGrouperFactory.java    | 35 +++++++++++++
 .../apache/samza/job/model/ContainerModel.java  |  2 +-
 .../org/apache/samza/job/model/TaskModel.java   |  2 +-
 .../org/apache/samza/config/TaskConfig.scala    | 15 +++++-
 .../grouper/task/GroupByContainerCount.scala    | 10 ++--
 .../task/GroupByContainerCountFactory.scala     | 30 +++++++++++
 .../grouper/task/TaskNameGrouper.scala          | 51 -------------------
 .../samza/coordinator/JobCoordinator.scala      | 23 ++++-----
 .../samza/storage/TestStorageRecovery.java      |  1 +
 .../samza/checkpoint/TestCheckpointTool.scala   |  3 +-
 .../task/TestGroupByContainerCount.scala        | 13 +++--
 .../samza/coordinator/TestJobCoordinator.scala  |  6 ++-
 14 files changed, 171 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 6aa7b91..ea73b40 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -436,6 +436,15 @@
                         This property controls how long the Samza container will wait for an orderly shutdown of task instances.
                     </td>
                 </tr>
+                <tr>
+                    <td class="property" id="task-name-grouper-factory">task.name.grouper.factory</td>
+                    <td class="default">org.apache.samza.<br>container.grouper.task.<br>GroupByContainerCountFactory</td>
+                    <td class="description">
+                        The fully-qualified name of the Java class which determines the factory class which will build the TaskNameGrouper.
+                        The default configuration value if the property is not present is <code>task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerCountFactory</code>.<br>
+                        The user can specify a custom implementation of the TaskNameGrouperFactory where a custom logic is implemented for grouping the tasks.
+                    </td>
+                </tr>
 
                 <tr>
                     <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
new file mode 100644
index 0000000..59a3237
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+
+import java.util.Set;
+
+/**
+ * <p>
+ * After the input SystemStreamPartitions have been mapped to their tasks by an
+ * implementation of
+ * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}
+ * , we can then map those groupings into the
+ * SamzaContainers on which they will run.
+ * This class takes a set of TaskModels and groups them together into
+ * ContainerModels. All tasks within a single ContainerModel will be executed in
+ * a single SamzaContainer.
+ * </p>
+ *
+ * <p>
+ * A simple implementation could assign each TaskModel to a separate container.
+ * More advanced implementations could examine the TaskModel to group them by
+ * data locality, anti-affinity, even distribution of expected bandwidth
+ * consumption, etc.
+ * </p>
+ */
+public interface TaskNameGrouper {
+  /**
+   * Group tasks into the containers they will share.
+   *
+   * @param tasks Set of tasks to group into containers.
+   * @return Set of containers, which contain the tasks that were passed in.
+   */
+  Set<ContainerModel> group(Set<TaskModel> tasks);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
new file mode 100644
index 0000000..8b967b7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import org.apache.samza.config.Config;
+
+/**
+ * Factory for building a {@link TaskNameGrouper}.
+ */
+public interface TaskNameGrouperFactory {
+  /**
+   * Builds a {@link TaskNameGrouper}. The config can be used to read the necessary values which are needed int the
+   * process of building the {@link TaskNameGrouper}
+   *
+   * @param config configuration to which values can be used to build a {@link TaskNameGrouper}
+   * @return a {@link TaskNameGrouper} implementation
+   */
+  TaskNameGrouper build(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
index 98a34bc..ed721b1 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
@@ -29,7 +29,7 @@ import org.apache.samza.container.TaskName;
  * process. The model is used in the job coordinator and SamzaContainer to
  * determine how to execute Samza jobs.
  * </p>
- * 
+ *
  * <p>
  * The hierarchy for a Samza's job data model is that jobs have containers, and
  * containers have tasks. Each data model contains relevant information, such as

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
index c26690a..e00c49d 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -32,7 +32,7 @@ import org.apache.samza.system.SystemStreamPartition;
  * The data model used to represent a task. The model is used in the job
  * coordinator and SamzaContainer to determine how to execute Samza jobs.
  * </p>
- * 
+ *
  * <p>
  * The hierarchy for a Samza's job data model is that jobs have containers, and
  * containers have tasks. Each data model contains relevant information, such as

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 0b3a235..51e9e99 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -20,7 +20,7 @@
 package org.apache.samza.config
 
 import org.apache.samza.system.SystemStream
-import org.apache.samza.util.Util
+import org.apache.samza.util.{Logging, Util}
 
 object TaskConfig {
   // task config constants
@@ -37,6 +37,7 @@ object TaskConfig {
   val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
   val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
   val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window
+  val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task grouper
 
   /**
    * Samza's container polls for more messages under two conditions. The first
@@ -58,7 +59,7 @@ object TaskConfig {
   implicit def Config2Task(config: Config) = new TaskConfig(config)
 }
 
-class TaskConfig(config: Config) extends ScalaMapConfig(config) {
+class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getInputStreams = getOption(TaskConfig.INPUT_STREAMS) match {
     case Some(streams) => if (streams.length > 0) {
       streams.split(",").map(systemStreamNames => {
@@ -104,4 +105,14 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
   def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
 
   def getIgnoredExceptions = getOption(TaskConfig.IGNORED_EXCEPTIONS)
+
+  def getTaskNameGrouperFactory = {
+    getOption(TaskConfig.GROUPER_FACTORY) match {
+      case Some(grouperFactory) => grouperFactory
+      case _ =>
+        info("No %s configuration, using 'org.apache.samza.container.grouper.task.GroupByContainerCountFactory'" format TaskConfig.GROUPER_FACTORY)
+        "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
index be36125..cb0a3bd 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
@@ -22,6 +22,7 @@ package org.apache.samza.container.grouper.task
 import org.apache.samza.job.model.TaskModel
 import org.apache.samza.job.model.ContainerModel
 import scala.collection.JavaConversions._
+import java.util
 
 /**
  * Group the SSP taskNames by dividing the number of taskNames into the number
@@ -34,11 +35,10 @@ import scala.collection.JavaConversions._
 class GroupByContainerCount(numContainers: Int) extends TaskNameGrouper {
   require(numContainers > 0, "Must have at least one container")
 
-  override def group(tasks: Set[TaskModel]): Set[ContainerModel] = {
+  override def group(tasks: util.Set[TaskModel]): util.Set[ContainerModel] = {
     require(tasks.size > 0, "No tasks found. Likely due to no input partitions. Can't run a job with no tasks.")
     require(tasks.size >= numContainers, "Your container count (%s) is larger than your task count (%s). Can't have containers with nothing to do, so aborting." format (numContainers, tasks.size))
-
-    tasks
+    setAsJavaSet(tasks
       .toList
       // Sort tasks by taskName.
       .sortWith { case (task1, task2) => task1.compareTo(task2) < 0 }
@@ -48,8 +48,8 @@ class GroupByContainerCount(numContainers: Int) extends TaskNameGrouper {
       .groupBy(_._2 % numContainers)
       // Take just TaskModel and remove task IDs.
       .mapValues(_.map { case (task, taskId) => (task.getTaskName, task) }.toMap)
-      .map { case (containerId, tasks) => new ContainerModel(containerId, tasks) }
-      .toSet
+      .map { case (containerId, taskModels) => new ContainerModel(containerId, taskModels) }
+      .toSet)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
new file mode 100644
index 0000000..eca6215
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task
+
+import org.apache.samza.config.{JobConfig, Config}
+
+/**
+ * Factory to build the GroupByContainerCount class.
+ */
+class GroupByContainerCountFactory extends TaskNameGrouperFactory {
+  override def build(config: Config): TaskNameGrouper = {
+    new GroupByContainerCount(config.getInt(JobConfig.JOB_CONTAINER_COUNT, 1))
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
deleted file mode 100644
index 62e94ea..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.grouper.task
-
-import org.apache.samza.job.model.TaskModel
-import org.apache.samza.job.model.ContainerModel
-
-/**
- * <p>
- * After the input SystemStreamPartitions have been mapped to their tasks by an
- * implementation of
- * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}
- * , we can then map those groupings into the
- * {@link org.apache.samza.container.SamzaContainer}s on which they will run.
- * This class takes a set of TaskModels and groups them together into
- * ContainerModels. All tasks within a single ContainerModel will be executed in
- * a single SamzaContainer.
- * </p>
- *
- * <p>
- * A simple implementation could assign each TaskModel to a separate container.
- * More advanced implementations could examine the TaskModel to group them by
- * data locality, anti-affinity, even distribution of expected bandwidth
- * consumption, etc.
- * </p>
- */
-trait TaskNameGrouper {
-  /**
-   * Group tasks into the containers they will share.
-   *
-   * @param tasks Set of tasks to group into containers.
-   * @return Set of containers, which contain the tasks that were passed in.
-   */
-  def group(tasks: Set[TaskModel]): Set[ContainerModel]
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index f621611..0dbf14b 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -20,10 +20,10 @@
 package org.apache.samza.coordinator
 
 
-import org.apache.samza.config.Config
+import org.apache.samza.config.{Config, ConfigRewriter}
 import org.apache.samza.job.model.{JobModel, TaskModel}
 import org.apache.samza.SamzaException
-import org.apache.samza.container.grouper.task.GroupByContainerCount
+import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
 import java.util
 import org.apache.samza.container.{LocalityManager, TaskName}
@@ -43,7 +43,6 @@ import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.coordinator.server.JobServlet
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
-import org.apache.samza.config.ConfigRewriter
 
 /**
  * Helper companion object that is responsible for wiring up a JobCoordinator
@@ -84,8 +83,7 @@ object JobCoordinator extends Logging {
                         checkpointManager: CheckpointManager,
                         changelogManager: ChangelogPartitionManager,
                         localityManager: LocalityManager) = {
-    val containerCount = config.getContainerCount
-    val jobModelGenerator = initializeJobModel(config, containerCount, checkpointManager, changelogManager, localityManager)
+    val jobModelGenerator = initializeJobModel(config, checkpointManager, changelogManager, localityManager)
     val server = new HttpServer
     server.addServlet("/*", new JobServlet(jobModelGenerator))
     new JobCoordinator(jobModelGenerator(), server, checkpointManager)
@@ -157,12 +155,9 @@ object JobCoordinator extends Logging {
    * which catchup with the latest content from the coordinator stream.
    */
   private def initializeJobModel(config: Config,
-                                 containerCount: Int,
                                  checkpointManager: CheckpointManager,
                                  changelogManager: ChangelogPartitionManager,
                                  localityManager: LocalityManager): () => JobModel = {
-    // TODO containerCount should go away when we generalize the job coordinator,
-    // and have a non-yarn-specific way of specifying container count.
 
     // Do grouping to fetch TaskName to SSP mapping
     val allSystemStreamPartitions = getInputStreamPartitions(config)
@@ -195,8 +190,7 @@ object JobCoordinator extends Logging {
                                                         checkpointManager,
                                                         groups,
                                                         previousChangelogMapping,
-                                                        localityManager,
-                                                        containerCount)
+                                                        localityManager)
 
     val jobModel = jobModelGenerator()
 
@@ -230,8 +224,7 @@ object JobCoordinator extends Logging {
                               checkpointManager: CheckpointManager,
                               groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
                               previousChangelogMapping: util.Map[TaskName, Integer],
-                              localityManager: LocalityManager,
-                              containerCount: Int): JobModel = {
+                              localityManager: LocalityManager): JobModel = {
     this.synchronized
     {
       // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
@@ -270,8 +263,10 @@ object JobCoordinator extends Logging {
 
       // Here is where we should put in a pluggable option for the
       // SSPTaskNameGrouper for locality, load-balancing, etc.
-      val containerGrouper = new GroupByContainerCount(containerCount)
-      val containerModels = containerGrouper.group(taskModels).map
+
+      val containerGrouperFactory = Util.getObj[TaskNameGrouperFactory](config.getTaskNameGrouperFactory)
+      val containerGrouper = containerGrouperFactory.build(config)
+      val containerModels = asScalaSet(containerGrouper.group(setAsJavaSet(taskModels))).map
               { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
 
       new JobModel(config, containerModels, localityManager)

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
index b8ae592..52d450d 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
@@ -93,6 +93,7 @@ public class TestStorageRecovery {
     map.put("task.inputs", "mockSystem.input");
     map.put("job.coordinator.system", "coordinator");
     map.put("systems.coordinator.samza.factory", MockCoordinatorStreamSystemFactory.class.getCanonicalName());
+    map.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerCountFactory");
     config = new MapConfig(map);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 0ba932c..00b8977 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -68,7 +68,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
       TaskConfig.INPUT_STREAMS -> "test.foo",
       TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
       SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName,
-      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
+      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+      TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
     ))
     val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata](
       new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
index 2c7cb28..ddf1fde 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
@@ -18,25 +18,28 @@
  */
 package org.apache.samza.container.grouper.task
 
+import java.util
+
 import org.apache.samza.container.TaskName
 import org.apache.samza.system.SystemStreamPartition
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.job.model.TaskModel
 import org.apache.samza.Partition
-import scala.collection.JavaConversions
 import org.scalatest.Assertions.intercept
 import scala.collection.JavaConversions._
 
 class TestGroupByContainerCount {
   @Test
   def testEmptyTasks {
-    intercept[IllegalArgumentException] { new GroupByContainerCount(1).group(Set()) }
+    intercept[IllegalArgumentException] { new GroupByContainerCount(1).group(new util.HashSet()) }
   }
 
   @Test
   def testFewerTasksThanContainers {
-    intercept[IllegalArgumentException] { new GroupByContainerCount(2).group(Set(null)) }
+    val taskModels = new util.HashSet[TaskModel]()
+    taskModels.add(getTaskModel("1", 1))
+    intercept[IllegalArgumentException] { new GroupByContainerCount(2).group(taskModels) }
   }
 
   @Test
@@ -47,8 +50,8 @@ class TestGroupByContainerCount {
       getTaskModel("3", 3),
       getTaskModel("4", 4),
       getTaskModel("5", 5))
-    val containers = new GroupByContainerCount(2)
-      .group(taskModels)
+    val containers = asScalaSet(new GroupByContainerCount(2)
+      .group(setAsJavaSet(taskModels)))
       .map(containerModel => containerModel.getContainerId -> containerModel)
       .toMap
     assertEquals(2, containers.size)

http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index d9ae187..4097ac7 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -101,7 +101,8 @@ class TestJobCoordinator {
       JobConfig.JOB_CONTAINER_COUNT -> "2",
       TaskConfig.INPUT_STREAMS -> "test.stream1",
       SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
-      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
+      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+      TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
       )
 
     // We want the mocksystemconsumer to use the same instance across runs
@@ -161,7 +162,8 @@ class TestJobCoordinator {
       JobConfig.JOB_CONTAINER_COUNT -> "2",
       TaskConfig.INPUT_STREAMS -> "test.stream1",
       SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
-      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
+      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+      TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
       )
 
     // Enable caching on MockConsumer to add more messages later