You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/01/22 23:08:50 UTC

samza git commit: StandbyTaskGenerator interface and a BuddyContainerBased implementation

Repository: samza
Updated Branches:
  refs/heads/master 3a2010604 -> 1cac1c8e5


StandbyTaskGenerator interface and a BuddyContainerBased implementation

This PR adds

* StandbyTaskGenerator that is used to populate StandbyTasks once the SSP and TaskName groupers have generated a TaskModel,

* A BuddyContainerBased implementation of this generator that generates standby tasks and groups them into all-standby-task buddy-containers.

* JobModelManager changes to read taskModes and do grouping without them.

* Tests for StandbyTaskGenerator.

Author: Ray Matharu <rm...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #872 from rmatharu/standbygenerator


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1cac1c8e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1cac1c8e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1cac1c8e

Branch: refs/heads/master
Commit: 1cac1c8e5b8ec903dc88c6d60673080885c5f34a
Parents: 3a20106
Author: Ray Matharu <rm...@linkedin.com>
Authored: Tue Jan 22 15:08:45 2019 -0800
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Tue Jan 22 15:08:45 2019 -0800

----------------------------------------------------------------------
 .../grouper/task/GroupByContainerCount.java     |   8 -
 .../grouper/task/TaskAssignmentManager.java     |  14 ++
 .../grouper/task/TaskNameGrouperProxy.java      | 151 +++++++++++++++++++
 .../org/apache/samza/config/JobConfig.scala     |   9 ++
 .../samza/coordinator/JobModelManager.scala     |  73 +++++----
 .../grouper/task/TestGroupByContainerCount.java |  14 +-
 .../grouper/task/TestTaskNameGrouperProxy.java  | 123 +++++++++++++++
 .../samza/coordinator/TestJobModelManager.java  |   3 +-
 8 files changed, 346 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 8a741db..23580f0 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -30,7 +30,6 @@ import java.util.Set;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.SamzaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -142,13 +141,6 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
    */
   private List<TaskGroup> getPreviousContainers(GrouperMetadata grouperMetadata, int taskCount) {
     Map<TaskName, String> taskToContainerId = grouperMetadata.getPreviousTaskToProcessorAssignment();
-    taskToContainerId.values().forEach(id -> {
-        try {
-          int intId = Integer.parseInt(id);
-        } catch (NumberFormatException nfe) {
-          throw new SamzaException("GroupByContainerCount cannot handle non-integer processorIds!", nfe);
-        }
-      });
 
     if (taskToContainerId.isEmpty()) {
       LOG.info("No task assignment map was saved.");

http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index e151384..669ed57 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
@@ -77,6 +78,7 @@ public class TaskAssignmentManager {
    * @param metricsRegistry the registry for reporting metrics.
    * @param keySerde the key serializer.
    * @param containerIdSerde the value serializer.
+   * @param taskModeSerde the task-mode serializer.
    */
   public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry, Serde<String> keySerde, Serde<String> containerIdSerde, Serde<String> taskModeSerde) {
     this.config = config;
@@ -108,6 +110,18 @@ public class TaskAssignmentManager {
     return Collections.unmodifiableMap(new HashMap<>(taskNameToContainerId));
   }
 
+  public Map<TaskName, TaskMode> readTaskModes() {
+    Map<TaskName, TaskMode> taskModeMap = new HashMap<>();
+    taskModeMappingMetadataStore.all().forEach((taskName, valueBytes) -> {
+        String taskMode = taskModeSerde.fromBytes(valueBytes);
+        if (taskMode != null) {
+          taskModeMap.put(new TaskName(taskName), TaskMode.valueOf(taskMode));
+        }
+        LOG.debug("Task mode assignment for task {}: {}", taskName, taskMode);
+      });
+    return Collections.unmodifiableMap(new HashMap<>(taskModeMap));
+  }
+
   /**
    * Method to write task container info to {@link MetadataStore}.
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
new file mode 100644
index 0000000..c7d556a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This TaskNameGrouperProxy wraps around any given TaskNameGrouper.
+ * In addition to apply the provided grouper, this grouper also generates
+ * generates Standby-tasks and adds them to separate dedicated containers.
+ * It adds (r-1) Standby-Tasks for each active task, where r is the replication factor.
+ * Hence it adds r-1 additional containers for each regular container.
+ *
+ * All Standby-tasks are assigned a TaskName with a "Standby" prefix.
+ * The new containers carrying Standby tasks that are added are assigned containerIDs corresponding to its
+ * active container, e.g., activeContainerID-replicaNumber
+ * For e.g.,
+ *
+ * If the initial container model map is:
+ *
+ * Container 0 : (Partition 0, Partition 1)
+ * Container 1 : (Partition 2, Partition 3)
+ * with replicationFactor = 3
+ *
+ * The generated containerModel map is:
+ * Container 0 : (Partition 0, Partition 1)
+ * Container 1 : (Partition 2, Partition 3)
+ * Container 0-0 : (Standby-Partition 0-0, Standby-Partition 1-0)
+ * Container 1-0 : (Standby-Partition 2-0, Standby-Partition 3-0)
+ * Container 0-1 : (Standby-Partition 0-1, Standby-Partition 1-1)
+ * Container 1-1 : (Standby-Partition 2-1, Standby-Partition 3-1)
+ */
+public class TaskNameGrouperProxy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskNameGrouperProxy.class);
+  private static final String CONTAINER_ID_SEPARATOR = "-";
+  private static final String TASKNAME_SEPARATOR = "-";
+  private static final String STANDBY_TASKNAME_PREFIX = "Standby";
+  private final TaskNameGrouper taskNameGrouper;
+  private final boolean standbyTasksEnabled;
+  private final int replicationFactor;
+
+  public TaskNameGrouperProxy(TaskNameGrouper taskNameGrouper, boolean standbyTasksEnabled,
+      int replicationFactor) {
+    this.taskNameGrouper = taskNameGrouper;
+    this.standbyTasksEnabled = standbyTasksEnabled;
+    this.replicationFactor = replicationFactor;
+  }
+
+  public Set<ContainerModel> group(Set<TaskModel> taskModels, GrouperMetadata grouperMetadata) {
+    if (this.standbyTasksEnabled) {
+      return generateStandbyTasks(this.taskNameGrouper.group(taskModels, grouperMetadata), replicationFactor);
+    } else {
+      return this.taskNameGrouper.group(taskModels, grouperMetadata);
+    }
+  }
+
+  public Set<ContainerModel> group(Set<TaskModel> taskModels, List<String> containersIds) {
+    if (this.standbyTasksEnabled) {
+      return generateStandbyTasks(this.taskNameGrouper.group(taskModels, containersIds), replicationFactor);
+    } else {
+      return this.taskNameGrouper.group(taskModels, containersIds);
+    }
+  }
+
+  /**
+   *  Generate a container model map with standby tasks added and grouped into buddy containers.
+   *  Package-private for testing.
+   *
+   * @param containerModels The initial container model map.
+   * @param replicationFactor The desired replication factor, if the replication-factor is n, we add n-1 standby tasks for each active task.
+   * @return The generated map of containerModels with added containers, and the initial regular containers
+   */
+  Set<ContainerModel> generateStandbyTasks(Set<ContainerModel> containerModels, int replicationFactor) {
+    LOG.info("Received current containerModel map : {}, replicationFactor : {}", containerModels, replicationFactor);
+    Set<ContainerModel> buddyContainers = new HashSet<>();
+
+    for (ContainerModel activeContainer : containerModels) {
+      for (int replicaNum = 0; replicaNum < replicationFactor - 1; replicaNum++) {
+        String buddyContainerId = getBuddyContainerId(activeContainer.getId(), replicaNum);
+
+        ContainerModel buddyContainerModel =
+            new ContainerModel(buddyContainerId, getTaskModelForBuddyContainer(activeContainer.getTasks(), replicaNum));
+
+        buddyContainers.add(buddyContainerModel);
+      }
+    }
+
+    LOG.info("Adding buddy containers : {}", buddyContainers);
+    buddyContainers.addAll(containerModels);
+    return buddyContainers;
+  }
+
+  // Helper method to populate the container model for a buddy container.
+  private static Map<TaskName, TaskModel> getTaskModelForBuddyContainer(
+      Map<TaskName, TaskModel> activeContainerTaskModel, int replicaNum) {
+    Map<TaskName, TaskModel> standbyTaskModels = new HashMap<>();
+
+    for (TaskName taskName : activeContainerTaskModel.keySet()) {
+      TaskName standbyTaskName = getStandbyTaskName(taskName, replicaNum);
+      TaskModel standbyTaskModel =
+          new TaskModel(standbyTaskName, activeContainerTaskModel.get(taskName).getSystemStreamPartitions(),
+              activeContainerTaskModel.get(taskName).getChangelogPartition(), TaskMode.Standby);
+      standbyTaskModels.put(standbyTaskName, standbyTaskModel);
+    }
+
+    LOG.info("Generated standbyTaskModels : {} for active task models : {}", standbyTaskModels,
+        activeContainerTaskModel);
+    return standbyTaskModels;
+  }
+
+  // Helper method to generate buddy containerIDs by appending the replica-number to the active-container's id.
+  private final static String getBuddyContainerId(String activeContainerId, int replicaNumber) {
+    return activeContainerId.concat(CONTAINER_ID_SEPARATOR).concat(String.valueOf(replicaNumber));
+  }
+
+  // Helper method to get the standby task name by prefixing "Standby" to the corresponding active task's name.
+  private final static TaskName getStandbyTaskName(TaskName activeTaskName, int replicaNum) {
+    return new TaskName(STANDBY_TASKNAME_PREFIX.concat(TASKNAME_SEPARATOR)
+        .concat(activeTaskName.getTaskName())
+        .concat(TASKNAME_SEPARATOR)
+        .concat(String.valueOf(replicaNum)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 2120c49..5eae194 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -110,6 +110,11 @@ object JobConfig {
   // Enables diagnostic appender for logging exception events
   val JOB_DIAGNOSTICS_ENABLED = "job.diagnostics.enabled"
 
+  // Enables standby tasks
+  val STANDBY_TASKS_ENABLED = "job.standbytasks.enabled"
+  val STANDBY_TASKS_REPLICATION_FACTOR = "job.standbytasks.replication.factor"
+  val DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR = 2
+
   // Specify DiagnosticAppender class
   val DIAGNOSTICS_APPENDER_CLASS = "job.diagnostics.appender.class"
   val DEFAULT_DIAGNOSTICS_APPENDER_CLASS = "org.apache.samza.logging.log4j.SimpleDiagnosticsAppender"
@@ -268,4 +273,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getSystemStreamPartitionMapperFactoryName: String = {
     get(JobConfig.SYSTEM_STREAM_PARTITION_MAPPER_FACTORY, classOf[HashSystemStreamPartitionMapperFactory].getName)
   }
+
+  def getStandbyTasksEnabled = getBoolean(JobConfig.STANDBY_TASKS_ENABLED, false)
+
+  def getStandbyTaskReplicationFactor = getInt(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, JobConfig.DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR)
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 5c983db..e49a61f 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -22,29 +22,24 @@ package org.apache.samza.coordinator
 import java.util
 import java.util.concurrent.atomic.AtomicReference
 
-import org.apache.samza.config._
+import org.apache.samza.Partition
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.Config
+import org.apache.samza.config.{Config, _}
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
 import org.apache.samza.container.grouper.task._
-import org.apache.samza.container.LocalityManager
-import org.apache.samza.container.TaskName
-import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode, TaskModel}
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.system._
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Util
-import org.apache.samza.Partition
+import org.apache.samza.container.{LocalityManager, TaskName}
+import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode, TaskModel}
+import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
 import org.apache.samza.runtime.LocationId
+import org.apache.samza.system._
+import org.apache.samza.util.{Logging, Util}
 
-import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  * Helper companion object that is responsible for wiring up a JobModelManager
@@ -104,7 +99,13 @@ object JobModelManager extends Logging {
     */
   def getGrouperMetadata(config: Config, localityManager: LocalityManager, taskAssignmentManager: TaskAssignmentManager) = {
     val processorLocality: util.Map[String, LocationId] = getProcessorLocality(config, localityManager)
-    val taskAssignment: util.Map[String, String] = taskAssignmentManager.readTaskAssignment()
+    val taskModes: util.Map[TaskName, TaskMode] = taskAssignmentManager.readTaskModes()
+
+    // we read the taskAssignment only for ActiveTasks
+    val taskAssignment: util.Map[String, String] = taskAssignmentManager.readTaskAssignment().
+      filterKeys(taskName => taskModes.get(new TaskName(taskName)).eq(TaskMode.Active))
+
+
     val taskNameToProcessorId: util.Map[TaskName, String] = new util.HashMap[TaskName, String]()
     for ((taskName, processorId) <- taskAssignment) {
       taskNameToProcessorId.put(new TaskName(taskName), processorId)
@@ -153,27 +154,43 @@ object JobModelManager extends Logging {
     * @param grouperMetadata       provides the historical metadata of the application.
     */
   def updateTaskAssignments(jobModel: JobModel, taskAssignmentManager: TaskAssignmentManager, grouperMetadata: GrouperMetadata): Unit = {
-    val taskNames: util.Set[String] = new util.HashSet[String]()
+    val activeTaskNames: util.Set[String] = new util.HashSet[String]()
+    val standbyTaskNames: util.Set[String] = new util.HashSet[String]()
     for (container <- jobModel.getContainers.values()) {
       for (taskModel <- container.getTasks.values()) {
-        taskNames.add(taskModel.getTaskName.getTaskName)
+        if(taskModel.getTaskMode.eq(TaskMode.Active)) {
+          activeTaskNames.add(taskModel.getTaskName.getTaskName)
+        }
+
+        if(taskModel.getTaskMode.eq(TaskMode.Standby)) {
+          standbyTaskNames.add(taskModel.getTaskName.getTaskName)
+        }
       }
     }
-    val taskToContainerId = grouperMetadata.getPreviousTaskToProcessorAssignment
-    if (taskNames.size() != taskToContainerId.size()) {
-      warn("Current task count {} does not match saved task count {}. Stateful jobs may observe misalignment of keys!",
-           taskNames.size(), taskToContainerId.size())
+
+    val previousTaskToContainerId = grouperMetadata.getPreviousTaskToProcessorAssignment
+    if (activeTaskNames.size() != previousTaskToContainerId.size()) {
+      warn("Current task count %s does not match saved task count %s. Stateful jobs may observe misalignment of keys!"
+        format (activeTaskNames.size(), previousTaskToContainerId.size()))
       // If the tasks changed, then the partition-task grouping is also likely changed and we can't handle that
       // without a much more complicated mapping. Further, the partition count may have changed, which means
       // input message keys are likely reshuffled w.r.t. partitions, so the local state may not contain necessary
       // data associated with the incoming keys. Warn the user and default to grouper
       // In this scenario the tasks may have been reduced, so we need to delete all the existing messages
-      taskAssignmentManager.deleteTaskContainerMappings(taskNames)
+      taskAssignmentManager.deleteTaskContainerMappings(previousTaskToContainerId.keys.map(taskName => taskName.getTaskName).asJava)
+    }
+
+    // if the set of standby tasks has changed, e.g., when the replication-factor changed, or the active-tasks-set has
+    // changed, we log a warning and delete the existing mapping for these tasks
+    val previousStandbyTasks = taskAssignmentManager.readTaskModes().filter(x => x._2.eq(TaskMode.Standby))
+    if(standbyTaskNames.asScala != previousStandbyTasks.keySet) {
+      info("The set of standby tasks has changed, current standby tasks %s, previous standby tasks %s" format (standbyTaskNames, previousStandbyTasks.keySet))
+      taskAssignmentManager.deleteTaskContainerMappings(previousStandbyTasks.map(x => x._1.getTaskName).asJava)
     }
 
     for (container <- jobModel.getContainers.values()) {
       for (taskName <- container.getTasks.keySet) {
-        taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, TaskMode.Active)
+        taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, container.getTasks.get(taskName).getTaskMode)
       }
     }
   }
@@ -293,15 +310,17 @@ object JobModelManager extends Logging {
     // Here is where we should put in a pluggable option for the
     // SSPTaskNameGrouper for locality, load-balancing, etc.
     val containerGrouperFactory = Util.getObj(config.getTaskNameGrouperFactory, classOf[TaskNameGrouperFactory])
-    val containerGrouper = containerGrouperFactory.build(config)
+    val standbyTasksEnabled = new JobConfig(config).getStandbyTasksEnabled
+    val standbyTaskReplicationFactor = new JobConfig(config).getStandbyTaskReplicationFactor
+    val taskNameGrouperProxy = new TaskNameGrouperProxy(containerGrouperFactory.build(config), standbyTasksEnabled, standbyTaskReplicationFactor)
     var containerModels: util.Set[ContainerModel] = null
     if(isHostAffinityEnabled) {
-      containerModels = containerGrouper.group(taskModels, grouperMetadata)
+      containerModels = taskNameGrouperProxy.group(taskModels, grouperMetadata)
     } else {
-      containerModels = containerGrouper.group(taskModels, new util.ArrayList[String](grouperMetadata.getProcessorLocality.keySet()))
+      containerModels = taskNameGrouperProxy.group(taskModels, new util.ArrayList[String](grouperMetadata.getProcessorLocality.keySet()))
     }
-    val containerMap = containerModels.asScala.map(containerModel => containerModel.getId -> containerModel).toMap
 
+    var containerMap = containerModels.asScala.map(containerModel => containerModel.getId -> containerModel).toMap
     new JobModel(config, containerMap.asJava)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
index 9e6e8d0..c833598 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
@@ -18,17 +18,15 @@
  */
 package org.apache.samza.container.grouper.task;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
+
 
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.SamzaException;
 import org.junit.Test;
 
 import static org.apache.samza.container.mock.ContainerMocks.*;
@@ -646,16 +644,6 @@ public class TestGroupByContainerCount {
     containers.remove(containers.iterator().next());
   }
 
-  @Test(expected = SamzaException.class)
-  public void testBalancerThrowsOnNonIntegerContainerIds() {
-    Set<TaskModel> taskModels = generateTaskModels(3);
-    Set<ContainerModel> prevContainers = new HashSet<>();
-    taskModels.forEach(model -> prevContainers.add(new ContainerModel(UUID.randomUUID().toString(), Collections.singletonMap(model.getTaskName(), model))));
-    Map<TaskName, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
-    GrouperMetadataImpl grouperMetadata = new GrouperMetadataImpl(new HashMap<>(), new HashMap<>(), new HashMap<>(), prevTaskToContainerMapping);
-    new GroupByContainerCount(3).group(taskModels, grouperMetadata); //Should throw
-  }
-
   @Test
   public void testBalancerWithNullLocalityManager() {
     Set<TaskModel> taskModels = generateTaskModels(3);

http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskNameGrouperProxy.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskNameGrouperProxy.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskNameGrouperProxy.java
new file mode 100644
index 0000000..c9369c3
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskNameGrouperProxy.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+public class TestTaskNameGrouperProxy {
+  private TaskNameGrouperProxy standbyTaskGenerator;
+
+  @Test
+  public void testBuddyContainerBasedGenerationIdentity() {
+    this.standbyTaskGenerator = new TaskNameGrouperProxy(Mockito.mock(TaskNameGrouper.class), true, 2);
+
+    Assert.assertEquals("Shouldnt add standby tasks to empty container map", Collections.emptySet(),
+        this.standbyTaskGenerator.generateStandbyTasks(Collections.emptySet(), 1));
+
+    Assert.assertEquals("Shouldnt add standby tasks when repl factor = 1", getContainerMap(),
+        this.standbyTaskGenerator.generateStandbyTasks(getContainerMap(), 1));
+  }
+
+  @Test
+  public void testBuddyContainerBasedGenerationForVaryingRF() {
+    testBuddyContainerBasedGeneration(1);
+    testBuddyContainerBasedGeneration(2);
+    testBuddyContainerBasedGeneration(3);
+  }
+
+  private void testBuddyContainerBasedGeneration(int replicationFactor) {
+    this.standbyTaskGenerator = new TaskNameGrouperProxy(Mockito.mock(TaskNameGrouper.class), true, 2);
+
+    Set<ContainerModel> initialContainerModels = getContainerMap();
+    Set<ContainerModel> containerModels =
+        standbyTaskGenerator.generateStandbyTasks(initialContainerModels, replicationFactor);
+
+    Assert.assertEquals(
+        "The generated map should the required number of containers because repl fac = " + replicationFactor,
+        initialContainerModels.size() * replicationFactor, containerModels.size());
+
+    Assert.assertTrue("The generated map should have all active containers present as is",
+        containerModels.containsAll(initialContainerModels));
+
+    Assert.assertEquals(
+        "The generated map should have the required total number of tasks because repl fac = " + replicationFactor,
+        replicationFactor * initialContainerModels.stream().mapToInt(x -> x.getTasks().size()).sum(),
+        containerModels.stream().mapToInt(x -> x.getTasks().size()).sum());
+
+    int numActiveTasks = containerModels.stream()
+        .mapToInt(container -> container.getTasks()
+            .keySet()
+            .stream()
+            .filter(taskName -> !taskName.getTaskName().contains("Standby"))
+            .toArray().length)
+        .sum();
+
+    int numStandbyTasks = containerModels.stream()
+        .mapToInt(container -> container.getTasks()
+            .keySet()
+            .stream()
+            .filter(taskName -> taskName.getTaskName().contains("Standby"))
+            .toArray().length)
+        .sum();
+
+    Assert.assertEquals("The generated map should have the same number of active tasks as in the initial map",
+        numActiveTasks, initialContainerModels.stream().mapToInt(x -> x.getTasks().size()).sum());
+
+    Assert.assertEquals("The generated map should have numActive * (repl-1) standby tasks",
+        numActiveTasks * (replicationFactor - 1), numStandbyTasks);
+  }
+
+  // Create a container map with two tasks in Container 0 and Container 1, and one in Container 2.
+  private static Set<ContainerModel> getContainerMap() {
+    Set<ContainerModel> retVal = new HashSet<>();
+
+    Map<TaskName, TaskModel> tasksForContainer0 = new HashMap<>();
+    tasksForContainer0.put(new TaskName("Partition 0"), getTaskModel(0));
+    tasksForContainer0.put(new TaskName("Partition 1"), getTaskModel(1));
+    retVal.add(new ContainerModel("0", tasksForContainer0));
+
+    Map<TaskName, TaskModel> tasksForContainer1 = new HashMap<>();
+    tasksForContainer1.put(new TaskName("Partition 2"), getTaskModel(2));
+    tasksForContainer1.put(new TaskName("Partition 3"), getTaskModel(3));
+    retVal.add(new ContainerModel("1", tasksForContainer1));
+
+    retVal.add(new ContainerModel("2", Collections.singletonMap(new TaskName("Partition 4"), getTaskModel(4))));
+    return retVal;
+  }
+
+  // Helper method that creates a taskmodel with one input ssp
+  private static TaskModel getTaskModel(int partitionNum) {
+    return new TaskModel(new TaskName("Partition " + partitionNum),
+        Collections.singleton(new SystemStreamPartition("test-system", "test-stream", new Partition(partitionNum))),
+        new Partition(partitionNum));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1cac1c8e/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index ae3d801..7eb5768 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -176,6 +176,7 @@ public class TestJobModelManager {
 
     // Mock the container to task assignment.
     when(mockTaskAssignmentManager.readTaskAssignment()).thenReturn(taskAssignment);
+    when(mockTaskAssignmentManager.readTaskModes()).thenReturn(Collections.singletonMap(new TaskName("task-0"), TaskMode.Active));
 
     GrouperMetadataImpl grouperMetadata = JobModelManager.getGrouperMetadata(new MapConfig(), mockLocalityManager, mockTaskAssignmentManager);
 
@@ -232,7 +233,7 @@ public class TestJobModelManager {
 
     // Verifications
     Mockito.verify(mockJobModel, atLeast(1)).getContainers();
-    Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings((Iterable<String>) taskNames);
+    Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any());
     Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-1", "test-container-id", TaskMode.Active);
     Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-2", "test-container-id", TaskMode.Active);
     Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-3", "test-container-id", TaskMode.Active);