You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2020/02/05 17:48:34 UTC

[samza] branch master updated: SAMZA-2444: JobModel save in CoordinatorStreamStore resulting flush for each message (#1259)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new ca641dc  SAMZA-2444: JobModel save in CoordinatorStreamStore resulting flush for each message (#1259)
ca641dc is described below

commit ca641dcf7450682a69e8eda53b9bc40702b4ad7a
Author: Alan Zhang <59...@users.noreply.github.com>
AuthorDate: Wed Feb 5 09:48:24 2020 -0800

    SAMZA-2444: JobModel save in CoordinatorStreamStore resulting flush for each message (#1259)
---
 .../placement/ContainerPlacementMetadataStore.java |  5 +++
 .../apache/samza/container/LocalityManager.java    |  1 +
 .../grouper/task/TaskAssignmentManager.java        | 48 +++++++++++++---------
 .../task/TaskPartitionAssignmentManager.java       | 43 ++++++++++---------
 .../apache/samza/coordinator/RunIdGenerator.java   |  1 +
 .../metadatastore/CoordinatorStreamStore.java      | 13 ------
 .../apache/samza/execution/LocalJobPlanner.java    |  1 +
 .../org/apache/samza/job/model/JobModelUtil.java   |  1 +
 .../apache/samza/startpoint/StartpointManager.java |  6 +++
 .../samza/storage/ChangelogStreamManager.java      |  1 +
 .../java/org/apache/samza/zk/ZkJobCoordinator.java |  1 +
 .../apache/samza/coordinator/JobModelManager.scala | 12 +++---
 .../grouper/task/TestTaskAssignmentManager.java    | 17 +++++---
 .../task/TestTaskPartitionAssignmentManager.java   | 24 +++++------
 .../samza/coordinator/TestJobModelManager.java     | 18 ++++----
 .../samza/coordinator/TestRunIdGenerator.java      |  1 +
 .../metadatastore/TestCoordinatorStreamStore.java  |  1 -
 .../samza/runtime/TestLocalApplicationRunner.java  |  2 +
 18 files changed, 109 insertions(+), 87 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
index 2eb3afa..02f698d 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
@@ -119,6 +119,7 @@ public class ContainerPlacementMetadataStore {
     try {
       containerPlacementMessageStore.put(toContainerPlacementMessageKey(message.getUuid(), message.getClass()),
           objectMapper.writeValueAsBytes(message));
+      containerPlacementMessageStore.flush();
     } catch (Exception ex) {
       throw new SamzaException(
           String.format("ContainerPlacementRequestMessage might have been not written to metastore %s", message), ex);
@@ -137,6 +138,7 @@ public class ContainerPlacementMetadataStore {
     try {
       containerPlacementMessageStore.put(toContainerPlacementMessageKey(message.getUuid(), message.getClass()),
           objectMapper.writeValueAsBytes(message));
+      containerPlacementMessageStore.flush();
     } catch (Exception ex) {
       throw new SamzaException(
           String.format("ContainerPlacementResponseMessage might have been not written to metastore %s", message), ex);
@@ -199,6 +201,7 @@ public class ContainerPlacementMetadataStore {
     Preconditions.checkState(!stopped, "Underlying metadata store not available");
     Preconditions.checkNotNull(uuid, "uuid cannot be null");
     containerPlacementMessageStore.delete(toContainerPlacementMessageKey(uuid, ContainerPlacementRequestMessage.class));
+    containerPlacementMessageStore.flush();
   }
 
   /**
@@ -209,6 +212,7 @@ public class ContainerPlacementMetadataStore {
     Preconditions.checkState(!stopped, "Underlying metadata store not available");
     Preconditions.checkNotNull(uuid, "uuid cannot be null");
     containerPlacementMessageStore.delete(toContainerPlacementMessageKey(uuid, ContainerPlacementResponseMessage.class));
+    containerPlacementMessageStore.flush();
   }
 
   /**
@@ -230,6 +234,7 @@ public class ContainerPlacementMetadataStore {
     for (String key : requestKeys) {
       containerPlacementMessageStore.delete(key);
     }
+    containerPlacementMessageStore.flush();
   }
 
   static String toContainerPlacementMessageKey(UUID uuid, Class<?> messageType) {
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 05f2e8b..864b558 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -94,6 +94,7 @@ public class LocalityManager {
     }
 
     metadataStore.put(containerId, valueSerde.toBytes(hostName));
+    metadataStore.flush();
   }
 
   public void close() {
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 16f8a51..e9fcadb 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -96,29 +96,35 @@ public class TaskAssignmentManager {
   }
 
   /**
-   * Method to write task container info to {@link MetadataStore}.
-   *
-   * @param taskName    the task name
-   * @param containerId the SamzaContainer ID or {@code null} to delete the mapping
-   * @param taskMode the mode of the task
+   * Method to batch write task container info to {@link MetadataStore}.
+   * @param mappings the task and container mappings: (ContainerId, (TaskName, TaskMode))
    */
-  public void writeTaskContainerMapping(String taskName, String containerId, TaskMode taskMode) {
-    String existingContainerId = taskNameToContainerId.get(taskName);
-    if (existingContainerId != null && !existingContainerId.equals(containerId)) {
-      LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId});
-    } else {
-      LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId);
-    }
+  public void writeTaskContainerMappings(Map<String, Map<String, TaskMode>> mappings) {
+    for (String containerId : mappings.keySet()) {
+      Map<String, TaskMode> tasks = mappings.get(containerId);
+      for (String taskName : tasks.keySet()) {
+        TaskMode taskMode = tasks.get(taskName);
+        LOG.info("Storing task: {} and container ID: {} into metadata store", taskName, containerId);
+        String existingContainerId = taskNameToContainerId.get(taskName);
+        if (existingContainerId != null && !existingContainerId.equals(containerId)) {
+          LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId});
+        } else {
+          LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId);
+        }
 
-    if (containerId == null) {
-      taskContainerMappingMetadataStore.delete(taskName);
-      taskModeMappingMetadataStore.delete(taskName);
-      taskNameToContainerId.remove(taskName);
-    } else {
-      taskContainerMappingMetadataStore.put(taskName, containerIdSerde.toBytes(containerId));
-      taskModeMappingMetadataStore.put(taskName, taskModeSerde.toBytes(taskMode.toString()));
-      taskNameToContainerId.put(taskName, containerId);
+        if (containerId == null) {
+          taskContainerMappingMetadataStore.delete(taskName);
+          taskModeMappingMetadataStore.delete(taskName);
+          taskNameToContainerId.remove(taskName);
+        } else {
+          taskContainerMappingMetadataStore.put(taskName, containerIdSerde.toBytes(containerId));
+          taskModeMappingMetadataStore.put(taskName, taskModeSerde.toBytes(taskMode.toString()));
+          taskNameToContainerId.put(taskName, containerId);
+        }
+      }
     }
+    taskContainerMappingMetadataStore.flush();
+    taskModeMappingMetadataStore.flush();
   }
 
   /**
@@ -132,6 +138,8 @@ public class TaskAssignmentManager {
       taskModeMappingMetadataStore.delete(taskName);
       taskNameToContainerId.remove(taskName);
     }
+    taskContainerMappingMetadataStore.flush();
+    taskModeMappingMetadataStore.flush();
   }
 
   public void close() {
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
index 7e32f0a..9b9c71e 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
@@ -66,28 +66,32 @@ public class TaskPartitionAssignmentManager {
   }
 
   /**
-   * Stores the task to partition assignments to the metadata store.
-   * @param partition the system stream partition.
-   * @param taskNames the task names to which the partition is assigned to.
+   * Stores the task names to {@link SystemStreamPartition} assignments to the metadata store.
+   * @param sspToTaskNameMapping the mapped assignments to write to the metadata store. If the task name list is empty,
+   *                             then the entry is deleted from the metadata store.
    */
-  public void writeTaskPartitionAssignment(SystemStreamPartition partition, List<String> taskNames) {
-    // For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
-    // SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
-    // systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
-    String serializedSSPAsJson = serializeSSPToJson(partition);
-    if (taskNames == null || taskNames.isEmpty()) {
-      LOG.info("Deleting the key: {} from the metadata store.", partition);
-      metadataStore.delete(serializedSSPAsJson);
-    } else {
-      try {
-        String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames);
-        byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString);
-        LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames);
-        metadataStore.put(serializedSSPAsJson, taskNamesAsBytes);
-      } catch (Exception e) {
-        throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
+  public void writeTaskPartitionAssignments(Map<SystemStreamPartition, List<String>> sspToTaskNameMapping) {
+    for (SystemStreamPartition partition: sspToTaskNameMapping.keySet()) {
+      List<String> taskNames = sspToTaskNameMapping.get(partition);
+      // For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
+      // SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
+      // systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
+      String serializedSSPAsJson = serializeSSPToJson(partition);
+      if (taskNames == null || taskNames.isEmpty()) {
+        LOG.info("Deleting the key: {} from the metadata store.", partition);
+        metadataStore.delete(serializedSSPAsJson);
+      } else {
+        try {
+          String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames);
+          byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString);
+          LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames);
+          metadataStore.put(serializedSSPAsJson, taskNamesAsBytes);
+        } catch (Exception e) {
+          throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
+        }
       }
     }
+    metadataStore.flush();
   }
 
   /**
@@ -120,6 +124,7 @@ public class TaskPartitionAssignmentManager {
       String serializedSSPAsJson = serializeSSPToJson(systemStreamPartition);
       metadataStore.delete(serializedSSPAsJson);
     }
+    metadataStore.flush();
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java b/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java
index 284c0bf..ddcc72b 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java
@@ -84,6 +84,7 @@ public class RunIdGenerator {
               String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
           LOG.info("Writing the run id for this run as {}", runId);
           metadataStore.put(CoordinationConstants.RUNID_STORE_KEY, runId.getBytes("UTF-8"));
+          metadataStore.flush();
         } else {
           runId = new String(metadataStore.get(CoordinationConstants.RUNID_STORE_KEY));
           LOG.info("Read the run id for this run as {}", runId);
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index e01a4c6..24ce457 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -126,19 +126,6 @@ public class CoordinatorStreamStore implements MetadataStore {
 
   @Override
   public void put(String namespacedKey, byte[] value) {
-    putWithoutFlush(namespacedKey, value);
-    flush();
-  }
-
-  @Override
-  public void putAll(Map<String, byte[]> entries) {
-    for (Map.Entry<String, byte[]> entry : entries.entrySet()) {
-      putWithoutFlush(entry.getKey(), entry.getValue());
-    }
-    flush();
-  }
-
-  private void putWithoutFlush(String namespacedKey, byte[] value) {
     // 1. Store the namespace and key into correct fields of the CoordinatorStreamKey and convert the key to bytes.
     CoordinatorMessageKey coordinatorMessageKey = deserializeCoordinatorMessageKeyFromJson(namespacedKey);
     CoordinatorStreamKeySerde keySerde = new CoordinatorStreamKeySerde(coordinatorMessageKey.getNamespace());
diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
index 000e55a..f55f02f 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -190,6 +190,7 @@ public class LocalJobPlanner extends JobPlanner {
           streamManager.createStreams(intStreams);
           String streamCreatedMessage = "Streams created by processor " + processorId;
           metadataStore.put(String.format(STREAM_CREATED_STATE_KEY, lockId), streamCreatedMessage.getBytes("UTF-8"));
+          metadataStore.flush();
           distributedLock.unlock();
           break;
         } else {
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index e230b1a..377fa94 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -80,6 +80,7 @@ public class JobModelUtil {
       byte[] jobModelSerializedAsBytes = jobModelSerializedAsString.getBytes(UTF_8);
       String metadataStoreKey = getJobModelKey(jobModelVersion);
       metadataStore.put(metadataStoreKey, jobModelSerializedAsBytes);
+      metadataStore.flush();
     } catch (Exception e) {
       throw new SamzaException(String.format("Exception occurred when storing JobModel: %s with version: %s.", jobModel, jobModelVersion), e);
     }
diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
index ba5acb7..9a196c3 100644
--- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
@@ -145,6 +145,7 @@ public class StartpointManager {
 
     try {
       readWriteStore.put(toReadWriteStoreKey(ssp, taskName), objectMapper.writeValueAsBytes(startpoint));
+      readWriteStore.flush();
     } catch (Exception ex) {
       throw new SamzaException(String.format(
           "Startpoint for SSP: %s and task: %s may not have been written to the metadata store.", ssp, taskName), ex);
@@ -208,6 +209,7 @@ public class StartpointManager {
     Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
 
     readWriteStore.delete(toReadWriteStoreKey(ssp, taskName));
+    readWriteStore.flush();
   }
 
   /**
@@ -218,6 +220,7 @@ public class StartpointManager {
     for (String key : readWriteKeys) {
       readWriteStore.delete(key);
     }
+    readWriteStore.flush();
   }
 
   /**
@@ -273,6 +276,7 @@ public class StartpointManager {
       StartpointFanOutPerTask newFanOut = fanOuts.get(taskName);
       fanOutStore.put(fanOutKey, objectMapper.writeValueAsBytes(newFanOut));
     }
+    fanOutStore.flush();
 
     for (SystemStreamPartition ssp : deleteKeys.keySet()) {
       for (TaskName taskName : deleteKeys.get(ssp)) {
@@ -314,6 +318,7 @@ public class StartpointManager {
     Preconditions.checkNotNull(taskName, "TaskName cannot be null");
 
     fanOutStore.delete(toFanOutStoreKey(taskName));
+    fanOutStore.flush();
   }
 
   /**
@@ -324,6 +329,7 @@ public class StartpointManager {
     for (String key : fanOutKeys) {
       fanOutStore.delete(key);
     }
+    fanOutStore.flush();
   }
 
   @VisibleForTesting
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
index e86e21a..4eed058 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
@@ -102,6 +102,7 @@ public class ChangelogStreamManager {
         metadataStore.delete(taskName);
       }
     }
+    metadataStore.flush();
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index dfd0304..feaabba 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -324,6 +324,7 @@ public class ZkJobCoordinator implements JobCoordinator {
           byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
           configStore.put(entry.getKey(), serializedValue);
         }
+        configStore.flush();
 
         // fan out the startpoints
         StartpointManager startpointManager = createStartpointManager();
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index b8c18fe..e2dbf3f 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -240,10 +240,12 @@ object JobModelManager extends Logging {
     // taskName to SystemStreamPartitions is done here to wire-in the data to {@see JobModel}.
     val sspToTaskNameMap: util.Map[SystemStreamPartition, util.List[String]] = new util.HashMap[SystemStreamPartition, util.List[String]]()
 
+    val taskContainerMappings: util.Map[String, util.Map[String, TaskMode]] = new util.HashMap[String, util.Map[String, TaskMode]]()
+
     for (container <- jobModel.getContainers.values()) {
       for ((taskName, taskModel) <- container.getTasks) {
-        info ("Storing task: %s and container ID: %s into metadata store" format(taskName.getTaskName, container.getId))
-        taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, container.getTasks.get(taskName).getTaskMode)
+        taskContainerMappings.putIfAbsent(container.getId, new util.HashMap[String, TaskMode]())
+        taskContainerMappings.get(container.getId).put(taskName.getTaskName, container.getTasks.get(taskName).getTaskMode)
         for (partition <- taskModel.getSystemStreamPartitions) {
           if (!sspToTaskNameMap.containsKey(partition)) {
             sspToTaskNameMap.put(partition, new util.ArrayList[String]())
@@ -253,10 +255,8 @@ object JobModelManager extends Logging {
       }
     }
 
-    for ((ssp, taskNames) <- sspToTaskNameMap) {
-      info ("Storing ssp: %s and task: %s into metadata store" format(ssp, taskNames))
-      taskPartitionAssignmentManager.writeTaskPartitionAssignment(ssp, taskNames)
-    }
+    taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings)
+    taskPartitionAssignmentManager.writeTaskPartitionAssignments(sspToTaskNameMap);
   }
 
   /**
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index 357e8ae..34909e3 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -60,10 +60,13 @@ public class TestTaskAssignmentManager {
   @Test
   public void testTaskAssignmentManager() {
     Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1", "Task2", "2", "Task3", "0", "Task4", "1");
+    Map<String, Map<String, TaskMode>> taskContainerMappings = ImmutableMap.of(
+        "0", ImmutableMap.of("Task0", TaskMode.Active, "Task3", TaskMode.Active),
+        "1", ImmutableMap.of("Task1", TaskMode.Active, "Task4", TaskMode.Active),
+        "2", ImmutableMap.of("Task2", TaskMode.Active)
+    );
 
-    for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
-      taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
-    }
+    taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings);
 
     Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
 
@@ -75,10 +78,12 @@ public class TestTaskAssignmentManager {
   @Test
   public void testDeleteMappings() {
     Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1");
+    Map<String, Map<String, TaskMode>> taskContainerMappings = ImmutableMap.of(
+        "0", ImmutableMap.of("Task0", TaskMode.Active),
+        "1", ImmutableMap.of("Task1", TaskMode.Active)
+    );
 
-    for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
-      taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
-    }
+    taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings);
 
     Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
     assertEquals(expectedMap, localMap);
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
index 1a7f0c9..1a0dc48 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
@@ -63,7 +63,7 @@ public class TestTaskPartitionAssignmentManager {
   @Test
   public void testReadAfterWrite() {
     List<String> testTaskNames = ImmutableList.of("test-task1", "test-task2", "test-task3");
-    taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames);
+    taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames));
 
     Map<SystemStreamPartition, List<String>> expectedMapping = ImmutableMap.of(testSystemStreamPartition, testTaskNames);
     Map<SystemStreamPartition, List<String>> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments();
@@ -74,7 +74,7 @@ public class TestTaskPartitionAssignmentManager {
   @Test
   public void testDeleteAfterWrite() {
     List<String> testTaskNames = ImmutableList.of("test-task1", "test-task2", "test-task3");
-    taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames);
+    taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames));
 
     Map<SystemStreamPartition, List<String>> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments();
     Assert.assertEquals(1, actualMapping.size());
@@ -94,12 +94,13 @@ public class TestTaskPartitionAssignmentManager {
     SystemStreamPartition testSystemStreamPartition3 = new SystemStreamPartition(TEST_SYSTEM, "stream-3", PARTITION);
     List<String> testTaskNames3 = ImmutableList.of("test-task6", "test-task7", "test-task8");
 
-    taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition1, testTaskNames1);
-    taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition2, testTaskNames2);
-    taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition3, testTaskNames3);
+    taskPartitionAssignmentManager.writeTaskPartitionAssignments(
+        ImmutableMap.of(testSystemStreamPartition1, testTaskNames1, testSystemStreamPartition2, testTaskNames2,
+            testSystemStreamPartition3, testTaskNames3));
 
-    Map<SystemStreamPartition, List<String>> expectedMapping = ImmutableMap.of(testSystemStreamPartition1, testTaskNames1,
-            testSystemStreamPartition2, testTaskNames2, testSystemStreamPartition3, testTaskNames3);
+    Map<SystemStreamPartition, List<String>> expectedMapping =
+        ImmutableMap.of(testSystemStreamPartition1, testTaskNames1, testSystemStreamPartition2, testTaskNames2,
+            testSystemStreamPartition3, testTaskNames3);
     Map<SystemStreamPartition, List<String>> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments();
 
     Assert.assertEquals(expectedMapping, actualMapping);
@@ -108,14 +109,11 @@ public class TestTaskPartitionAssignmentManager {
   @Test
   public void testMultipleUpdatesReturnsTheMostRecentValue() {
     List<String> testTaskNames1 = ImmutableList.of("test-task1", "test-task2", "test-task3");
-
-    taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames1);
-
+    taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames1));
     List<String> testTaskNames2 = ImmutableList.of("test-task4", "test-task5");
-    taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames2);
-
+    taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames2));
     List<String> testTaskNames3 = ImmutableList.of("test-task6", "test-task7", "test-task8");
-    taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames3);
+    taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames3));
 
     Map<SystemStreamPartition, List<String>> expectedMapping = ImmutableMap.of(testSystemStreamPartition, testTaskNames3);
     Map<SystemStreamPartition, List<String>> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments();
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index d58cf18..fe94e83 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -270,7 +270,7 @@ public class TestJobModelManager {
 
     when(mockJobModel.getContainers()).thenReturn(containerMapping);
     when(mockGrouperMetadata.getPreviousTaskToProcessorAssignment()).thenReturn(new HashMap<>());
-    Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMapping(Mockito.any(), Mockito.any(), Mockito.any());
+    Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMappings(Mockito.any());
 
     JobModelManager.updateTaskAssignments(mockJobModel, mockTaskAssignmentManager, mockTaskPartitionAssignmentManager, mockGrouperMetadata);
 
@@ -289,18 +289,18 @@ public class TestJobModelManager {
     // Verifications
     Mockito.verify(mockJobModel, atLeast(1)).getContainers();
     Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any());
-    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-1", "test-container-id", TaskMode.Active);
-    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-2", "test-container-id", TaskMode.Active);
-    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-3", "test-container-id", TaskMode.Active);
-    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-4", "test-container-id", TaskMode.Active);
+    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMappings(ImmutableMap.of("test-container-id",
+        ImmutableMap.of("task-1", TaskMode.Active, "task-2", TaskMode.Active, "task-3", TaskMode.Active, "task-4", TaskMode.Active)));
 
     // Verify that the old, stale partition mappings had been purged in the coordinator stream.
     Mockito.verify(mockTaskPartitionAssignmentManager).delete(systemStreamPartitions);
 
     // Verify that the new task to partition assignment is stored in the coordinator stream.
-    Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition1, ImmutableList.of("task-1"));
-    Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition2, ImmutableList.of("task-2"));
-    Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition3, ImmutableList.of("task-3"));
-    Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition4, ImmutableList.of("task-4"));
+    Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignments(ImmutableMap.of(
+        testSystemStreamPartition1, ImmutableList.of("task-1"),
+        testSystemStreamPartition2, ImmutableList.of("task-2"),
+        testSystemStreamPartition3, ImmutableList.of("task-3"),
+        testSystemStreamPartition4, ImmutableList.of("task-4")
+    ));
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java
index 39bc583..ef24541 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java
@@ -51,6 +51,7 @@ public class TestRunIdGenerator {
     verify(membership, Mockito.times(1)).registerProcessor();
     verify(membership, Mockito.times(1)).getNumberOfProcessors();
     verify(metadataStore, Mockito.times(1)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class));
+    verify(metadataStore, Mockito.times(1)).flush();
   }
 
   @Test
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
index e073a0e..5865de9 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
@@ -105,7 +105,6 @@ public class TestCoordinatorStreamStore {
     Assert.assertEquals(value3, spyCoordinatorStreamStore.get(key3));
     Assert.assertEquals(value4, spyCoordinatorStreamStore.get(key4));
     Assert.assertEquals(value5, spyCoordinatorStreamStore.get(key5));
-    Mockito.verify(spyCoordinatorStreamStore).flush(); // verify flush called only once during putAll
   }
 
   @Test
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 972a042..cef7906 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -457,6 +457,7 @@ public class TestLocalApplicationRunner {
     verify(coordinationUtils, Mockito.times(1)).getLock(CoordinationConstants.RUNID_LOCK_ID);
     verify(clusterMembership, Mockito.times(1)).getNumberOfProcessors();
     verify(metadataStore, Mockito.times(1)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class));
+    verify(metadataStore, Mockito.times(1)).flush();
   }
 
   /**
@@ -483,6 +484,7 @@ public class TestLocalApplicationRunner {
     verify(coordinationUtils, Mockito.times(0)).getClusterMembership();
     verify(clusterMembership, Mockito.times(0)).getNumberOfProcessors();
     verify(metadataStore, Mockito.times(0)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class));
+    verify(metadataStore, Mockito.times(0)).flush();
   }
 
   private void prepareTestForRunId() throws Exception {