You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/07/28 02:49:08 UTC
samza git commit: SAMZA-717: expose the TaskNameGrouper API
Repository: samza
Updated Branches:
refs/heads/master f49d2c938 -> 2554669ea
SAMZA-717: expose the TaskNameGrouper API
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2554669e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2554669e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2554669e
Branch: refs/heads/master
Commit: 2554669eaf6417683f5cab4dcb12483f2a9955a1
Parents: f49d2c9
Author: József Márton Jung <j....@levi9.com>
Authored: Mon Jul 27 17:48:02 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Mon Jul 27 17:48:02 2015 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 9 ++++
.../container/grouper/task/TaskNameGrouper.java | 53 ++++++++++++++++++++
.../grouper/task/TaskNameGrouperFactory.java | 35 +++++++++++++
.../apache/samza/job/model/ContainerModel.java | 2 +-
.../org/apache/samza/job/model/TaskModel.java | 2 +-
.../org/apache/samza/config/TaskConfig.scala | 15 +++++-
.../grouper/task/GroupByContainerCount.scala | 10 ++--
.../task/GroupByContainerCountFactory.scala | 30 +++++++++++
.../grouper/task/TaskNameGrouper.scala | 51 -------------------
.../samza/coordinator/JobCoordinator.scala | 23 ++++-----
.../samza/storage/TestStorageRecovery.java | 1 +
.../samza/checkpoint/TestCheckpointTool.scala | 3 +-
.../task/TestGroupByContainerCount.scala | 13 +++--
.../samza/coordinator/TestJobCoordinator.scala | 6 ++-
14 files changed, 171 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 6aa7b91..ea73b40 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -436,6 +436,15 @@
This property controls how long the Samza container will wait for an orderly shutdown of task instances.
</td>
</tr>
+ <tr>
+ <td class="property" id="task-name-grouper-factory">task.name.grouper.factory</td>
+ <td class="default">org.apache.samza.<br>container.grouper.task.<br>GroupByContainerCountFactory</td>
+ <td class="description">
+ The fully-qualified name of the Java class which determines the factory class which will build the TaskNameGrouper.
+ The default configuration value if the property is not present is <code>task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerCountFactory</code>.<br>
+ The user can specify a custom implementation of the TaskNameGrouperFactory where a custom logic is implemented for grouping the tasks.
+ </td>
+ </tr>
<tr>
<th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
new file mode 100644
index 0000000..59a3237
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+
+import java.util.Set;
+
+/**
+ * <p>
+ * After the input SystemStreamPartitions have been mapped to their tasks by an
+ * implementation of
+ * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}
+ * , we can then map those groupings into the
+ * SamzaContainers on which they will run.
+ * This class takes a set of TaskModels and groups them together into
+ * ContainerModels. All tasks within a single ContainerModel will be executed in
+ * a single SamzaContainer.
+ * </p>
+ *
+ * <p>
+ * A simple implementation could assign each TaskModel to a separate container.
+ * More advanced implementations could examine the TaskModel to group them by
+ * data locality, anti-affinity, even distribution of expected bandwidth
+ * consumption, etc.
+ * </p>
+ */
+public interface TaskNameGrouper {
+ /**
+ * Group tasks into the containers they will share.
+ *
+ * @param tasks Set of tasks to group into containers.
+ * @return Set of containers, which contain the tasks that were passed in.
+ */
+ Set<ContainerModel> group(Set<TaskModel> tasks);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
new file mode 100644
index 0000000..8b967b7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import org.apache.samza.config.Config;
+
+/**
+ * Factory for building a {@link TaskNameGrouper}.
+ */
+public interface TaskNameGrouperFactory {
+ /**
+ * Builds a {@link TaskNameGrouper}. The config can be used to read the necessary values which are needed int the
+ * process of building the {@link TaskNameGrouper}
+ *
+ * @param config configuration to which values can be used to build a {@link TaskNameGrouper}
+ * @return a {@link TaskNameGrouper} implementation
+ */
+ TaskNameGrouper build(Config config);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
index 98a34bc..ed721b1 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
@@ -29,7 +29,7 @@ import org.apache.samza.container.TaskName;
* process. The model is used in the job coordinator and SamzaContainer to
* determine how to execute Samza jobs.
* </p>
- *
+ *
* <p>
* The hierarchy for a Samza's job data model is that jobs have containers, and
* containers have tasks. Each data model contains relevant information, such as
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
index c26690a..e00c49d 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -32,7 +32,7 @@ import org.apache.samza.system.SystemStreamPartition;
* The data model used to represent a task. The model is used in the job
* coordinator and SamzaContainer to determine how to execute Samza jobs.
* </p>
- *
+ *
* <p>
* The hierarchy for a Samza's job data model is that jobs have containers, and
* containers have tasks. Each data model contains relevant information, such as
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 0b3a235..51e9e99 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -20,7 +20,7 @@
package org.apache.samza.config
import org.apache.samza.system.SystemStream
-import org.apache.samza.util.Util
+import org.apache.samza.util.{Logging, Util}
object TaskConfig {
// task config constants
@@ -37,6 +37,7 @@ object TaskConfig {
val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window
+ val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task grouper
/**
* Samza's container polls for more messages under two conditions. The first
@@ -58,7 +59,7 @@ object TaskConfig {
implicit def Config2Task(config: Config) = new TaskConfig(config)
}
-class TaskConfig(config: Config) extends ScalaMapConfig(config) {
+class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getInputStreams = getOption(TaskConfig.INPUT_STREAMS) match {
case Some(streams) => if (streams.length > 0) {
streams.split(",").map(systemStreamNames => {
@@ -104,4 +105,14 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
def getIgnoredExceptions = getOption(TaskConfig.IGNORED_EXCEPTIONS)
+
+ def getTaskNameGrouperFactory = {
+ getOption(TaskConfig.GROUPER_FACTORY) match {
+ case Some(grouperFactory) => grouperFactory
+ case _ =>
+ info("No %s configuration, using 'org.apache.samza.container.grouper.task.GroupByContainerCountFactory'" format TaskConfig.GROUPER_FACTORY)
+ "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
index be36125..cb0a3bd 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
@@ -22,6 +22,7 @@ package org.apache.samza.container.grouper.task
import org.apache.samza.job.model.TaskModel
import org.apache.samza.job.model.ContainerModel
import scala.collection.JavaConversions._
+import java.util
/**
* Group the SSP taskNames by dividing the number of taskNames into the number
@@ -34,11 +35,10 @@ import scala.collection.JavaConversions._
class GroupByContainerCount(numContainers: Int) extends TaskNameGrouper {
require(numContainers > 0, "Must have at least one container")
- override def group(tasks: Set[TaskModel]): Set[ContainerModel] = {
+ override def group(tasks: util.Set[TaskModel]): util.Set[ContainerModel] = {
require(tasks.size > 0, "No tasks found. Likely due to no input partitions. Can't run a job with no tasks.")
require(tasks.size >= numContainers, "Your container count (%s) is larger than your task count (%s). Can't have containers with nothing to do, so aborting." format (numContainers, tasks.size))
-
- tasks
+ setAsJavaSet(tasks
.toList
// Sort tasks by taskName.
.sortWith { case (task1, task2) => task1.compareTo(task2) < 0 }
@@ -48,8 +48,8 @@ class GroupByContainerCount(numContainers: Int) extends TaskNameGrouper {
.groupBy(_._2 % numContainers)
// Take just TaskModel and remove task IDs.
.mapValues(_.map { case (task, taskId) => (task.getTaskName, task) }.toMap)
- .map { case (containerId, tasks) => new ContainerModel(containerId, tasks) }
- .toSet
+ .map { case (containerId, taskModels) => new ContainerModel(containerId, taskModels) }
+ .toSet)
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
new file mode 100644
index 0000000..eca6215
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task
+
+import org.apache.samza.config.{JobConfig, Config}
+
+/**
+ * Factory to build the GroupByContainerCount class.
+ */
+class GroupByContainerCountFactory extends TaskNameGrouperFactory {
+ override def build(config: Config): TaskNameGrouper = {
+ new GroupByContainerCount(config.getInt(JobConfig.JOB_CONTAINER_COUNT, 1))
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
deleted file mode 100644
index 62e94ea..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.grouper.task
-
-import org.apache.samza.job.model.TaskModel
-import org.apache.samza.job.model.ContainerModel
-
-/**
- * <p>
- * After the input SystemStreamPartitions have been mapped to their tasks by an
- * implementation of
- * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}
- * , we can then map those groupings into the
- * {@link org.apache.samza.container.SamzaContainer}s on which they will run.
- * This class takes a set of TaskModels and groups them together into
- * ContainerModels. All tasks within a single ContainerModel will be executed in
- * a single SamzaContainer.
- * </p>
- *
- * <p>
- * A simple implementation could assign each TaskModel to a separate container.
- * More advanced implementations could examine the TaskModel to group them by
- * data locality, anti-affinity, even distribution of expected bandwidth
- * consumption, etc.
- * </p>
- */
-trait TaskNameGrouper {
- /**
- * Group tasks into the containers they will share.
- *
- * @param tasks Set of tasks to group into containers.
- * @return Set of containers, which contain the tasks that were passed in.
- */
- def group(tasks: Set[TaskModel]): Set[ContainerModel]
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index f621611..0dbf14b 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -20,10 +20,10 @@
package org.apache.samza.coordinator
-import org.apache.samza.config.Config
+import org.apache.samza.config.{Config, ConfigRewriter}
import org.apache.samza.job.model.{JobModel, TaskModel}
import org.apache.samza.SamzaException
-import org.apache.samza.container.grouper.task.GroupByContainerCount
+import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
import java.util
import org.apache.samza.container.{LocalityManager, TaskName}
@@ -43,7 +43,6 @@ import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
import org.apache.samza.coordinator.server.JobServlet
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
-import org.apache.samza.config.ConfigRewriter
/**
* Helper companion object that is responsible for wiring up a JobCoordinator
@@ -84,8 +83,7 @@ object JobCoordinator extends Logging {
checkpointManager: CheckpointManager,
changelogManager: ChangelogPartitionManager,
localityManager: LocalityManager) = {
- val containerCount = config.getContainerCount
- val jobModelGenerator = initializeJobModel(config, containerCount, checkpointManager, changelogManager, localityManager)
+ val jobModelGenerator = initializeJobModel(config, checkpointManager, changelogManager, localityManager)
val server = new HttpServer
server.addServlet("/*", new JobServlet(jobModelGenerator))
new JobCoordinator(jobModelGenerator(), server, checkpointManager)
@@ -157,12 +155,9 @@ object JobCoordinator extends Logging {
* which catchup with the latest content from the coordinator stream.
*/
private def initializeJobModel(config: Config,
- containerCount: Int,
checkpointManager: CheckpointManager,
changelogManager: ChangelogPartitionManager,
localityManager: LocalityManager): () => JobModel = {
- // TODO containerCount should go away when we generalize the job coordinator,
- // and have a non-yarn-specific way of specifying container count.
// Do grouping to fetch TaskName to SSP mapping
val allSystemStreamPartitions = getInputStreamPartitions(config)
@@ -195,8 +190,7 @@ object JobCoordinator extends Logging {
checkpointManager,
groups,
previousChangelogMapping,
- localityManager,
- containerCount)
+ localityManager)
val jobModel = jobModelGenerator()
@@ -230,8 +224,7 @@ object JobCoordinator extends Logging {
checkpointManager: CheckpointManager,
groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
previousChangelogMapping: util.Map[TaskName, Integer],
- localityManager: LocalityManager,
- containerCount: Int): JobModel = {
+ localityManager: LocalityManager): JobModel = {
this.synchronized
{
// If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
@@ -270,8 +263,10 @@ object JobCoordinator extends Logging {
// Here is where we should put in a pluggable option for the
// SSPTaskNameGrouper for locality, load-balancing, etc.
- val containerGrouper = new GroupByContainerCount(containerCount)
- val containerModels = containerGrouper.group(taskModels).map
+
+ val containerGrouperFactory = Util.getObj[TaskNameGrouperFactory](config.getTaskNameGrouperFactory)
+ val containerGrouper = containerGrouperFactory.build(config)
+ val containerModels = asScalaSet(containerGrouper.group(setAsJavaSet(taskModels))).map
{ case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
new JobModel(config, containerModels, localityManager)
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
index b8ae592..52d450d 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
@@ -93,6 +93,7 @@ public class TestStorageRecovery {
map.put("task.inputs", "mockSystem.input");
map.put("job.coordinator.system", "coordinator");
map.put("systems.coordinator.samza.factory", MockCoordinatorStreamSystemFactory.class.getCanonicalName());
+ map.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerCountFactory");
config = new MapConfig(map);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 0ba932c..00b8977 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -68,7 +68,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
TaskConfig.INPUT_STREAMS -> "test.foo",
TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName,
- SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
+ SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+ TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
))
val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata](
new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
index 2c7cb28..ddf1fde 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
@@ -18,25 +18,28 @@
*/
package org.apache.samza.container.grouper.task
+import java.util
+
import org.apache.samza.container.TaskName
import org.apache.samza.system.SystemStreamPartition
import org.junit.Assert._
import org.junit.Test
import org.apache.samza.job.model.TaskModel
import org.apache.samza.Partition
-import scala.collection.JavaConversions
import org.scalatest.Assertions.intercept
import scala.collection.JavaConversions._
class TestGroupByContainerCount {
@Test
def testEmptyTasks {
- intercept[IllegalArgumentException] { new GroupByContainerCount(1).group(Set()) }
+ intercept[IllegalArgumentException] { new GroupByContainerCount(1).group(new util.HashSet()) }
}
@Test
def testFewerTasksThanContainers {
- intercept[IllegalArgumentException] { new GroupByContainerCount(2).group(Set(null)) }
+ val taskModels = new util.HashSet[TaskModel]()
+ taskModels.add(getTaskModel("1", 1))
+ intercept[IllegalArgumentException] { new GroupByContainerCount(2).group(taskModels) }
}
@Test
@@ -47,8 +50,8 @@ class TestGroupByContainerCount {
getTaskModel("3", 3),
getTaskModel("4", 4),
getTaskModel("5", 5))
- val containers = new GroupByContainerCount(2)
- .group(taskModels)
+ val containers = asScalaSet(new GroupByContainerCount(2)
+ .group(setAsJavaSet(taskModels)))
.map(containerModel => containerModel.getContainerId -> containerModel)
.toMap
assertEquals(2, containers.size)
http://git-wip-us.apache.org/repos/asf/samza/blob/2554669e/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index d9ae187..4097ac7 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -101,7 +101,8 @@ class TestJobCoordinator {
JobConfig.JOB_CONTAINER_COUNT -> "2",
TaskConfig.INPUT_STREAMS -> "test.stream1",
SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
- SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
+ SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+ TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
)
// We want the mocksystemconsumer to use the same instance across runs
@@ -161,7 +162,8 @@ class TestJobCoordinator {
JobConfig.JOB_CONTAINER_COUNT -> "2",
TaskConfig.INPUT_STREAMS -> "test.stream1",
SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
- SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
+ SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+ TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
)
// Enable caching on MockConsumer to add more messages later