You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/01/22 23:08:50 UTC
samza git commit: StandbyTaskGenerator interface and a
BuddyContainerBased implementation
Repository: samza
Updated Branches:
refs/heads/master 3a2010604 -> 1cac1c8e5
StandbyTaskGenerator interface and a BuddyContainerBased implementation
This PR adds
* StandbyTaskGenerator that is used to populate StandbyTasks once the SSP and TaskName groupers have generated a TaskModel,
* A BuddyContainerBased implementation of this generator that generates standby tasks and groups them into all-standby-task buddy-containers.
* JobModelManager changes to read taskModes and do grouping without them.
* Tests for StandbyTaskGenerator.
Author: Ray Matharu <rm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>
Closes #872 from rmatharu/standbygenerator
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1cac1c8e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1cac1c8e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1cac1c8e
Branch: refs/heads/master
Commit: 1cac1c8e5b8ec903dc88c6d60673080885c5f34a
Parents: 3a20106
Author: Ray Matharu <rm...@linkedin.com>
Authored: Tue Jan 22 15:08:45 2019 -0800
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Tue Jan 22 15:08:45 2019 -0800
----------------------------------------------------------------------
.../grouper/task/GroupByContainerCount.java | 8 -
.../grouper/task/TaskAssignmentManager.java | 14 ++
.../grouper/task/TaskNameGrouperProxy.java | 151 +++++++++++++++++++
.../org/apache/samza/config/JobConfig.scala | 9 ++
.../samza/coordinator/JobModelManager.scala | 73 +++++----
.../grouper/task/TestGroupByContainerCount.java | 14 +-
.../grouper/task/TestTaskNameGrouperProxy.java | 123 +++++++++++++++
.../samza/coordinator/TestJobModelManager.java | 3 +-
8 files changed, 346 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 8a741db..23580f0 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -30,7 +30,6 @@ import java.util.Set;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.SamzaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -142,13 +141,6 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
*/
private List<TaskGroup> getPreviousContainers(GrouperMetadata grouperMetadata, int taskCount) {
Map<TaskName, String> taskToContainerId = grouperMetadata.getPreviousTaskToProcessorAssignment();
- taskToContainerId.values().forEach(id -> {
- try {
- int intId = Integer.parseInt(id);
- } catch (NumberFormatException nfe) {
- throw new SamzaException("GroupByContainerCount cannot handle non-integer processorIds!", nfe);
- }
- });
if (taskToContainerId.isEmpty()) {
LOG.info("No task assignment map was saved.");
http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index e151384..669ed57 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
@@ -77,6 +78,7 @@ public class TaskAssignmentManager {
* @param metricsRegistry the registry for reporting metrics.
* @param keySerde the key serializer.
* @param containerIdSerde the value serializer.
+ * @param taskModeSerde the task-mode serializer.
*/
public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry, Serde<String> keySerde, Serde<String> containerIdSerde, Serde<String> taskModeSerde) {
this.config = config;
@@ -108,6 +110,18 @@ public class TaskAssignmentManager {
return Collections.unmodifiableMap(new HashMap<>(taskNameToContainerId));
}
+ public Map<TaskName, TaskMode> readTaskModes() {
+ Map<TaskName, TaskMode> taskModeMap = new HashMap<>();
+ taskModeMappingMetadataStore.all().forEach((taskName, valueBytes) -> {
+ String taskMode = taskModeSerde.fromBytes(valueBytes);
+ if (taskMode != null) {
+ taskModeMap.put(new TaskName(taskName), TaskMode.valueOf(taskMode));
+ }
+ LOG.debug("Task mode assignment for task {}: {}", taskName, taskMode);
+ });
+ return Collections.unmodifiableMap(new HashMap<>(taskModeMap));
+ }
+
/**
* Method to write task container info to {@link MetadataStore}.
*
http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
new file mode 100644
index 0000000..c7d556a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This TaskNameGrouperProxy wraps around any given TaskNameGrouper.
+ * In addition to apply the provided grouper, this grouper also generates
+ * generates Standby-tasks and adds them to separate dedicated containers.
+ * It adds (r-1) Standby-Tasks for each active task, where r is the replication factor.
+ * Hence it adds r-1 additional containers for each regular container.
+ *
+ * All Standby-tasks are assigned a TaskName with a "Standby" prefix.
+ * The new containers carrying Standby tasks that are added are assigned containerIDs corresponding to its
+ * active container, e.g., activeContainerID-replicaNumber
+ * For e.g.,
+ *
+ * If the initial container model map is:
+ *
+ * Container 0 : (Partition 0, Partition 1)
+ * Container 1 : (Partition 2, Partition 3)
+ * with replicationFactor = 3
+ *
+ * The generated containerModel map is:
+ * Container 0 : (Partition 0, Partition 1)
+ * Container 1 : (Partition 2, Partition 3)
+ * Container 0-0 : (Standby-Partition 0-0, Standby-Partition 1-0)
+ * Container 1-0 : (Standby-Partition 2-0, Standby-Partition 3-0)
+ * Container 0-1 : (Standby-Partition 0-1, Standby-Partition 1-1)
+ * Container 1-1 : (Standby-Partition 2-1, Standby-Partition 3-1)
+ */
+public class TaskNameGrouperProxy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskNameGrouperProxy.class);
+ private static final String CONTAINER_ID_SEPARATOR = "-";
+ private static final String TASKNAME_SEPARATOR = "-";
+ private static final String STANDBY_TASKNAME_PREFIX = "Standby";
+ private final TaskNameGrouper taskNameGrouper;
+ private final boolean standbyTasksEnabled;
+ private final int replicationFactor;
+
+ public TaskNameGrouperProxy(TaskNameGrouper taskNameGrouper, boolean standbyTasksEnabled,
+ int replicationFactor) {
+ this.taskNameGrouper = taskNameGrouper;
+ this.standbyTasksEnabled = standbyTasksEnabled;
+ this.replicationFactor = replicationFactor;
+ }
+
+ public Set<ContainerModel> group(Set<TaskModel> taskModels, GrouperMetadata grouperMetadata) {
+ if (this.standbyTasksEnabled) {
+ return generateStandbyTasks(this.taskNameGrouper.group(taskModels, grouperMetadata), replicationFactor);
+ } else {
+ return this.taskNameGrouper.group(taskModels, grouperMetadata);
+ }
+ }
+
+ public Set<ContainerModel> group(Set<TaskModel> taskModels, List<String> containersIds) {
+ if (this.standbyTasksEnabled) {
+ return generateStandbyTasks(this.taskNameGrouper.group(taskModels, containersIds), replicationFactor);
+ } else {
+ return this.taskNameGrouper.group(taskModels, containersIds);
+ }
+ }
+
+ /**
+ * Generate a container model map with standby tasks added and grouped into buddy containers.
+ * Package-private for testing.
+ *
+ * @param containerModels The initial container model map.
+ * @param replicationFactor The desired replication factor, if the replication-factor is n, we add n-1 standby tasks for each active task.
+ * @return The generated map of containerModels with added containers, and the initial regular containers
+ */
+ Set<ContainerModel> generateStandbyTasks(Set<ContainerModel> containerModels, int replicationFactor) {
+ LOG.info("Received current containerModel map : {}, replicationFactor : {}", containerModels, replicationFactor);
+ Set<ContainerModel> buddyContainers = new HashSet<>();
+
+ for (ContainerModel activeContainer : containerModels) {
+ for (int replicaNum = 0; replicaNum < replicationFactor - 1; replicaNum++) {
+ String buddyContainerId = getBuddyContainerId(activeContainer.getId(), replicaNum);
+
+ ContainerModel buddyContainerModel =
+ new ContainerModel(buddyContainerId, getTaskModelForBuddyContainer(activeContainer.getTasks(), replicaNum));
+
+ buddyContainers.add(buddyContainerModel);
+ }
+ }
+
+ LOG.info("Adding buddy containers : {}", buddyContainers);
+ buddyContainers.addAll(containerModels);
+ return buddyContainers;
+ }
+
+ // Helper method to populate the container model for a buddy container.
+ private static Map<TaskName, TaskModel> getTaskModelForBuddyContainer(
+ Map<TaskName, TaskModel> activeContainerTaskModel, int replicaNum) {
+ Map<TaskName, TaskModel> standbyTaskModels = new HashMap<>();
+
+ for (TaskName taskName : activeContainerTaskModel.keySet()) {
+ TaskName standbyTaskName = getStandbyTaskName(taskName, replicaNum);
+ TaskModel standbyTaskModel =
+ new TaskModel(standbyTaskName, activeContainerTaskModel.get(taskName).getSystemStreamPartitions(),
+ activeContainerTaskModel.get(taskName).getChangelogPartition(), TaskMode.Standby);
+ standbyTaskModels.put(standbyTaskName, standbyTaskModel);
+ }
+
+ LOG.info("Generated standbyTaskModels : {} for active task models : {}", standbyTaskModels,
+ activeContainerTaskModel);
+ return standbyTaskModels;
+ }
+
+ // Helper method to generate buddy containerIDs by appending the replica-number to the active-container's id.
+ private final static String getBuddyContainerId(String activeContainerId, int replicaNumber) {
+ return activeContainerId.concat(CONTAINER_ID_SEPARATOR).concat(String.valueOf(replicaNumber));
+ }
+
+ // Helper method to get the standby task name by prefixing "Standby" to the corresponding active task's name.
+ private final static TaskName getStandbyTaskName(TaskName activeTaskName, int replicaNum) {
+ return new TaskName(STANDBY_TASKNAME_PREFIX.concat(TASKNAME_SEPARATOR)
+ .concat(activeTaskName.getTaskName())
+ .concat(TASKNAME_SEPARATOR)
+ .concat(String.valueOf(replicaNum)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 2120c49..5eae194 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -110,6 +110,11 @@ object JobConfig {
// Enables diagnostic appender for logging exception events
val JOB_DIAGNOSTICS_ENABLED = "job.diagnostics.enabled"
+ // Enables standby tasks
+ val STANDBY_TASKS_ENABLED = "job.standbytasks.enabled"
+ val STANDBY_TASKS_REPLICATION_FACTOR = "job.standbytasks.replication.factor"
+ val DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR = 2
+
// Specify DiagnosticAppender class
val DIAGNOSTICS_APPENDER_CLASS = "job.diagnostics.appender.class"
val DEFAULT_DIAGNOSTICS_APPENDER_CLASS = "org.apache.samza.logging.log4j.SimpleDiagnosticsAppender"
@@ -268,4 +273,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getSystemStreamPartitionMapperFactoryName: String = {
get(JobConfig.SYSTEM_STREAM_PARTITION_MAPPER_FACTORY, classOf[HashSystemStreamPartitionMapperFactory].getName)
}
+
+ def getStandbyTasksEnabled = getBoolean(JobConfig.STANDBY_TASKS_ENABLED, false)
+
+ def getStandbyTaskReplicationFactor = getInt(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, JobConfig.DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 5c983db..e49a61f 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -22,29 +22,24 @@ package org.apache.samza.coordinator
import java.util
import java.util.concurrent.atomic.AtomicReference
-import org.apache.samza.config._
+import org.apache.samza.Partition
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.Config
+import org.apache.samza.config.{Config, _}
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
import org.apache.samza.container.grouper.task._
-import org.apache.samza.container.LocalityManager
-import org.apache.samza.container.TaskName
-import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode, TaskModel}
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.system._
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Util
-import org.apache.samza.Partition
+import org.apache.samza.container.{LocalityManager, TaskName}
+import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode, TaskModel}
+import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
import org.apache.samza.runtime.LocationId
+import org.apache.samza.system._
+import org.apache.samza.util.{Logging, Util}
-import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
* Helper companion object that is responsible for wiring up a JobModelManager
@@ -104,7 +99,13 @@ object JobModelManager extends Logging {
*/
def getGrouperMetadata(config: Config, localityManager: LocalityManager, taskAssignmentManager: TaskAssignmentManager) = {
val processorLocality: util.Map[String, LocationId] = getProcessorLocality(config, localityManager)
- val taskAssignment: util.Map[String, String] = taskAssignmentManager.readTaskAssignment()
+ val taskModes: util.Map[TaskName, TaskMode] = taskAssignmentManager.readTaskModes()
+
+ // we read the taskAssignment only for ActiveTasks
+ val taskAssignment: util.Map[String, String] = taskAssignmentManager.readTaskAssignment().
+ filterKeys(taskName => taskModes.get(new TaskName(taskName)).eq(TaskMode.Active))
+
+
val taskNameToProcessorId: util.Map[TaskName, String] = new util.HashMap[TaskName, String]()
for ((taskName, processorId) <- taskAssignment) {
taskNameToProcessorId.put(new TaskName(taskName), processorId)
@@ -153,27 +154,43 @@ object JobModelManager extends Logging {
* @param grouperMetadata provides the historical metadata of the application.
*/
def updateTaskAssignments(jobModel: JobModel, taskAssignmentManager: TaskAssignmentManager, grouperMetadata: GrouperMetadata): Unit = {
- val taskNames: util.Set[String] = new util.HashSet[String]()
+ val activeTaskNames: util.Set[String] = new util.HashSet[String]()
+ val standbyTaskNames: util.Set[String] = new util.HashSet[String]()
for (container <- jobModel.getContainers.values()) {
for (taskModel <- container.getTasks.values()) {
- taskNames.add(taskModel.getTaskName.getTaskName)
+ if(taskModel.getTaskMode.eq(TaskMode.Active)) {
+ activeTaskNames.add(taskModel.getTaskName.getTaskName)
+ }
+
+ if(taskModel.getTaskMode.eq(TaskMode.Standby)) {
+ standbyTaskNames.add(taskModel.getTaskName.getTaskName)
+ }
}
}
- val taskToContainerId = grouperMetadata.getPreviousTaskToProcessorAssignment
- if (taskNames.size() != taskToContainerId.size()) {
- warn("Current task count {} does not match saved task count {}. Stateful jobs may observe misalignment of keys!",
- taskNames.size(), taskToContainerId.size())
+
+ val previousTaskToContainerId = grouperMetadata.getPreviousTaskToProcessorAssignment
+ if (activeTaskNames.size() != previousTaskToContainerId.size()) {
+ warn("Current task count %s does not match saved task count %s. Stateful jobs may observe misalignment of keys!"
+ format (activeTaskNames.size(), previousTaskToContainerId.size()))
// If the tasks changed, then the partition-task grouping is also likely changed and we can't handle that
// without a much more complicated mapping. Further, the partition count may have changed, which means
// input message keys are likely reshuffled w.r.t. partitions, so the local state may not contain necessary
// data associated with the incoming keys. Warn the user and default to grouper
// In this scenario the tasks may have been reduced, so we need to delete all the existing messages
- taskAssignmentManager.deleteTaskContainerMappings(taskNames)
+ taskAssignmentManager.deleteTaskContainerMappings(previousTaskToContainerId.keys.map(taskName => taskName.getTaskName).asJava)
+ }
+
+ // if the set of standby tasks has changed, e.g., when the replication-factor changed, or the active-tasks-set has
+ // changed, we log a warning and delete the existing mapping for these tasks
+ val previousStandbyTasks = taskAssignmentManager.readTaskModes().filter(x => x._2.eq(TaskMode.Standby))
+ if(standbyTaskNames.asScala != previousStandbyTasks.keySet) {
+ info("The set of standby tasks has changed, current standby tasks %s, previous standby tasks %s" format (standbyTaskNames, previousStandbyTasks.keySet))
+ taskAssignmentManager.deleteTaskContainerMappings(previousStandbyTasks.map(x => x._1.getTaskName).asJava)
}
for (container <- jobModel.getContainers.values()) {
for (taskName <- container.getTasks.keySet) {
- taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, TaskMode.Active)
+ taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, container.getTasks.get(taskName).getTaskMode)
}
}
}
@@ -293,15 +310,17 @@ object JobModelManager extends Logging {
// Here is where we should put in a pluggable option for the
// SSPTaskNameGrouper for locality, load-balancing, etc.
val containerGrouperFactory = Util.getObj(config.getTaskNameGrouperFactory, classOf[TaskNameGrouperFactory])
- val containerGrouper = containerGrouperFactory.build(config)
+ val standbyTasksEnabled = new JobConfig(config).getStandbyTasksEnabled
+ val standbyTaskReplicationFactor = new JobConfig(config).getStandbyTaskReplicationFactor
+ val taskNameGrouperProxy = new TaskNameGrouperProxy(containerGrouperFactory.build(config), standbyTasksEnabled, standbyTaskReplicationFactor)
var containerModels: util.Set[ContainerModel] = null
if(isHostAffinityEnabled) {
- containerModels = containerGrouper.group(taskModels, grouperMetadata)
+ containerModels = taskNameGrouperProxy.group(taskModels, grouperMetadata)
} else {
- containerModels = containerGrouper.group(taskModels, new util.ArrayList[String](grouperMetadata.getProcessorLocality.keySet()))
+ containerModels = taskNameGrouperProxy.group(taskModels, new util.ArrayList[String](grouperMetadata.getProcessorLocality.keySet()))
}
- val containerMap = containerModels.asScala.map(containerModel => containerModel.getId -> containerModel).toMap
+ var containerMap = containerModels.asScala.map(containerModel => containerModel.getId -> containerModel).toMap
new JobModel(config, containerMap.asJava)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
index 9e6e8d0..c833598 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
@@ -18,17 +18,15 @@
*/
package org.apache.samza.container.grouper.task;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
+
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.SamzaException;
import org.junit.Test;
import static org.apache.samza.container.mock.ContainerMocks.*;
@@ -646,16 +644,6 @@ public class TestGroupByContainerCount {
containers.remove(containers.iterator().next());
}
- @Test(expected = SamzaException.class)
- public void testBalancerThrowsOnNonIntegerContainerIds() {
- Set<TaskModel> taskModels = generateTaskModels(3);
- Set<ContainerModel> prevContainers = new HashSet<>();
- taskModels.forEach(model -> prevContainers.add(new ContainerModel(UUID.randomUUID().toString(), Collections.singletonMap(model.getTaskName(), model))));
- Map<TaskName, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
- GrouperMetadataImpl grouperMetadata = new GrouperMetadataImpl(new HashMap<>(), new HashMap<>(), new HashMap<>(), prevTaskToContainerMapping);
- new GroupByContainerCount(3).group(taskModels, grouperMetadata); //Should throw
- }
-
@Test
public void testBalancerWithNullLocalityManager() {
Set<TaskModel> taskModels = generateTaskModels(3);
http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskNameGrouperProxy.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskNameGrouperProxy.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskNameGrouperProxy.java
new file mode 100644
index 0000000..c9369c3
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskNameGrouperProxy.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+public class TestTaskNameGrouperProxy {
+ private TaskNameGrouperProxy standbyTaskGenerator;
+
+ @Test
+ public void testBuddyContainerBasedGenerationIdentity() {
+ this.standbyTaskGenerator = new TaskNameGrouperProxy(Mockito.mock(TaskNameGrouper.class), true, 2);
+
+ Assert.assertEquals("Shouldnt add standby tasks to empty container map", Collections.emptySet(),
+ this.standbyTaskGenerator.generateStandbyTasks(Collections.emptySet(), 1));
+
+ Assert.assertEquals("Shouldnt add standby tasks when repl factor = 1", getContainerMap(),
+ this.standbyTaskGenerator.generateStandbyTasks(getContainerMap(), 1));
+ }
+
+ @Test
+ public void testBuddyContainerBasedGenerationForVaryingRF() {
+ testBuddyContainerBasedGeneration(1);
+ testBuddyContainerBasedGeneration(2);
+ testBuddyContainerBasedGeneration(3);
+ }
+
+ private void testBuddyContainerBasedGeneration(int replicationFactor) {
+ this.standbyTaskGenerator = new TaskNameGrouperProxy(Mockito.mock(TaskNameGrouper.class), true, 2);
+
+ Set<ContainerModel> initialContainerModels = getContainerMap();
+ Set<ContainerModel> containerModels =
+ standbyTaskGenerator.generateStandbyTasks(initialContainerModels, replicationFactor);
+
+ Assert.assertEquals(
+ "The generated map should the required number of containers because repl fac = " + replicationFactor,
+ initialContainerModels.size() * replicationFactor, containerModels.size());
+
+ Assert.assertTrue("The generated map should have all active containers present as is",
+ containerModels.containsAll(initialContainerModels));
+
+ Assert.assertEquals(
+ "The generated map should have the required total number of tasks because repl fac = " + replicationFactor,
+ replicationFactor * initialContainerModels.stream().mapToInt(x -> x.getTasks().size()).sum(),
+ containerModels.stream().mapToInt(x -> x.getTasks().size()).sum());
+
+ int numActiveTasks = containerModels.stream()
+ .mapToInt(container -> container.getTasks()
+ .keySet()
+ .stream()
+ .filter(taskName -> !taskName.getTaskName().contains("Standby"))
+ .toArray().length)
+ .sum();
+
+ int numStandbyTasks = containerModels.stream()
+ .mapToInt(container -> container.getTasks()
+ .keySet()
+ .stream()
+ .filter(taskName -> taskName.getTaskName().contains("Standby"))
+ .toArray().length)
+ .sum();
+
+ Assert.assertEquals("The generated map should have the same number of active tasks as in the initial map",
+ numActiveTasks, initialContainerModels.stream().mapToInt(x -> x.getTasks().size()).sum());
+
+ Assert.assertEquals("The generated map should have numActive * (repl-1) standby tasks",
+ numActiveTasks * (replicationFactor - 1), numStandbyTasks);
+ }
+
+ // Create a container map with two tasks in Container 0 and Container 1, and one in Container 2.
+ private static Set<ContainerModel> getContainerMap() {
+ Set<ContainerModel> retVal = new HashSet<>();
+
+ Map<TaskName, TaskModel> tasksForContainer0 = new HashMap<>();
+ tasksForContainer0.put(new TaskName("Partition 0"), getTaskModel(0));
+ tasksForContainer0.put(new TaskName("Partition 1"), getTaskModel(1));
+ retVal.add(new ContainerModel("0", tasksForContainer0));
+
+ Map<TaskName, TaskModel> tasksForContainer1 = new HashMap<>();
+ tasksForContainer1.put(new TaskName("Partition 2"), getTaskModel(2));
+ tasksForContainer1.put(new TaskName("Partition 3"), getTaskModel(3));
+ retVal.add(new ContainerModel("1", tasksForContainer1));
+
+ retVal.add(new ContainerModel("2", Collections.singletonMap(new TaskName("Partition 4"), getTaskModel(4))));
+ return retVal;
+ }
+
+ // Helper method that creates a taskmodel with one input ssp
+ private static TaskModel getTaskModel(int partitionNum) {
+ return new TaskModel(new TaskName("Partition " + partitionNum),
+ Collections.singleton(new SystemStreamPartition("test-system", "test-stream", new Partition(partitionNum))),
+ new Partition(partitionNum));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index ae3d801..7eb5768 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -176,6 +176,7 @@ public class TestJobModelManager {
// Mock the container to task assignment.
when(mockTaskAssignmentManager.readTaskAssignment()).thenReturn(taskAssignment);
+ when(mockTaskAssignmentManager.readTaskModes()).thenReturn(Collections.singletonMap(new TaskName("task-0"), TaskMode.Active));
GrouperMetadataImpl grouperMetadata = JobModelManager.getGrouperMetadata(new MapConfig(), mockLocalityManager, mockTaskAssignmentManager);
@@ -232,7 +233,7 @@ public class TestJobModelManager {
// Verifications
Mockito.verify(mockJobModel, atLeast(1)).getContainers();
- Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings((Iterable<String>) taskNames);
+ Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any());
Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-1", "test-container-id", TaskMode.Active);
Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-2", "test-container-id", TaskMode.Active);
Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-3", "test-container-id", TaskMode.Active);