You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2020/02/05 17:48:34 UTC
[samza] branch master updated: SAMZA-2444: JobModel save in
CoordinatorStreamStore resulting flush for each message (#1259)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new ca641dc SAMZA-2444: JobModel save in CoordinatorStreamStore resulting flush for each message (#1259)
ca641dc is described below
commit ca641dcf7450682a69e8eda53b9bc40702b4ad7a
Author: Alan Zhang <59...@users.noreply.github.com>
AuthorDate: Wed Feb 5 09:48:24 2020 -0800
SAMZA-2444: JobModel save in CoordinatorStreamStore resulting flush for each message (#1259)
---
.../placement/ContainerPlacementMetadataStore.java | 5 +++
.../apache/samza/container/LocalityManager.java | 1 +
.../grouper/task/TaskAssignmentManager.java | 48 +++++++++++++---------
.../task/TaskPartitionAssignmentManager.java | 43 ++++++++++---------
.../apache/samza/coordinator/RunIdGenerator.java | 1 +
.../metadatastore/CoordinatorStreamStore.java | 13 ------
.../apache/samza/execution/LocalJobPlanner.java | 1 +
.../org/apache/samza/job/model/JobModelUtil.java | 1 +
.../apache/samza/startpoint/StartpointManager.java | 6 +++
.../samza/storage/ChangelogStreamManager.java | 1 +
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 1 +
.../apache/samza/coordinator/JobModelManager.scala | 12 +++---
.../grouper/task/TestTaskAssignmentManager.java | 17 +++++---
.../task/TestTaskPartitionAssignmentManager.java | 24 +++++------
.../samza/coordinator/TestJobModelManager.java | 18 ++++----
.../samza/coordinator/TestRunIdGenerator.java | 1 +
.../metadatastore/TestCoordinatorStreamStore.java | 1 -
.../samza/runtime/TestLocalApplicationRunner.java | 2 +
18 files changed, 109 insertions(+), 87 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
index 2eb3afa..02f698d 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
@@ -119,6 +119,7 @@ public class ContainerPlacementMetadataStore {
try {
containerPlacementMessageStore.put(toContainerPlacementMessageKey(message.getUuid(), message.getClass()),
objectMapper.writeValueAsBytes(message));
+ containerPlacementMessageStore.flush();
} catch (Exception ex) {
throw new SamzaException(
String.format("ContainerPlacementRequestMessage might have been not written to metastore %s", message), ex);
@@ -137,6 +138,7 @@ public class ContainerPlacementMetadataStore {
try {
containerPlacementMessageStore.put(toContainerPlacementMessageKey(message.getUuid(), message.getClass()),
objectMapper.writeValueAsBytes(message));
+ containerPlacementMessageStore.flush();
} catch (Exception ex) {
throw new SamzaException(
String.format("ContainerPlacementResponseMessage might have been not written to metastore %s", message), ex);
@@ -199,6 +201,7 @@ public class ContainerPlacementMetadataStore {
Preconditions.checkState(!stopped, "Underlying metadata store not available");
Preconditions.checkNotNull(uuid, "uuid cannot be null");
containerPlacementMessageStore.delete(toContainerPlacementMessageKey(uuid, ContainerPlacementRequestMessage.class));
+ containerPlacementMessageStore.flush();
}
/**
@@ -209,6 +212,7 @@ public class ContainerPlacementMetadataStore {
Preconditions.checkState(!stopped, "Underlying metadata store not available");
Preconditions.checkNotNull(uuid, "uuid cannot be null");
containerPlacementMessageStore.delete(toContainerPlacementMessageKey(uuid, ContainerPlacementResponseMessage.class));
+ containerPlacementMessageStore.flush();
}
/**
@@ -230,6 +234,7 @@ public class ContainerPlacementMetadataStore {
for (String key : requestKeys) {
containerPlacementMessageStore.delete(key);
}
+ containerPlacementMessageStore.flush();
}
static String toContainerPlacementMessageKey(UUID uuid, Class<?> messageType) {
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 05f2e8b..864b558 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -94,6 +94,7 @@ public class LocalityManager {
}
metadataStore.put(containerId, valueSerde.toBytes(hostName));
+ metadataStore.flush();
}
public void close() {
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 16f8a51..e9fcadb 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -96,29 +96,35 @@ public class TaskAssignmentManager {
}
/**
- * Method to write task container info to {@link MetadataStore}.
- *
- * @param taskName the task name
- * @param containerId the SamzaContainer ID or {@code null} to delete the mapping
- * @param taskMode the mode of the task
+ * Method to batch write task container info to {@link MetadataStore}.
+ * @param mappings the task and container mappings: (ContainerId, (TaskName, TaskMode))
*/
- public void writeTaskContainerMapping(String taskName, String containerId, TaskMode taskMode) {
- String existingContainerId = taskNameToContainerId.get(taskName);
- if (existingContainerId != null && !existingContainerId.equals(containerId)) {
- LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId});
- } else {
- LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId);
- }
+ public void writeTaskContainerMappings(Map<String, Map<String, TaskMode>> mappings) {
+ for (String containerId : mappings.keySet()) {
+ Map<String, TaskMode> tasks = mappings.get(containerId);
+ for (String taskName : tasks.keySet()) {
+ TaskMode taskMode = tasks.get(taskName);
+ LOG.info("Storing task: {} and container ID: {} into metadata store", taskName, containerId);
+ String existingContainerId = taskNameToContainerId.get(taskName);
+ if (existingContainerId != null && !existingContainerId.equals(containerId)) {
+ LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId});
+ } else {
+ LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId);
+ }
- if (containerId == null) {
- taskContainerMappingMetadataStore.delete(taskName);
- taskModeMappingMetadataStore.delete(taskName);
- taskNameToContainerId.remove(taskName);
- } else {
- taskContainerMappingMetadataStore.put(taskName, containerIdSerde.toBytes(containerId));
- taskModeMappingMetadataStore.put(taskName, taskModeSerde.toBytes(taskMode.toString()));
- taskNameToContainerId.put(taskName, containerId);
+ if (containerId == null) {
+ taskContainerMappingMetadataStore.delete(taskName);
+ taskModeMappingMetadataStore.delete(taskName);
+ taskNameToContainerId.remove(taskName);
+ } else {
+ taskContainerMappingMetadataStore.put(taskName, containerIdSerde.toBytes(containerId));
+ taskModeMappingMetadataStore.put(taskName, taskModeSerde.toBytes(taskMode.toString()));
+ taskNameToContainerId.put(taskName, containerId);
+ }
+ }
}
+ taskContainerMappingMetadataStore.flush();
+ taskModeMappingMetadataStore.flush();
}
/**
@@ -132,6 +138,8 @@ public class TaskAssignmentManager {
taskModeMappingMetadataStore.delete(taskName);
taskNameToContainerId.remove(taskName);
}
+ taskContainerMappingMetadataStore.flush();
+ taskModeMappingMetadataStore.flush();
}
public void close() {
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
index 7e32f0a..9b9c71e 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
@@ -66,28 +66,32 @@ public class TaskPartitionAssignmentManager {
}
/**
- * Stores the task to partition assignments to the metadata store.
- * @param partition the system stream partition.
- * @param taskNames the task names to which the partition is assigned to.
+ * Stores the task names to {@link SystemStreamPartition} assignments to the metadata store.
+ * @param sspToTaskNameMapping the mapped assignments to write to the metadata store. If the task name list is empty,
+ * then the entry is deleted from the metadata store.
*/
- public void writeTaskPartitionAssignment(SystemStreamPartition partition, List<String> taskNames) {
- // For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
- // SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
- // systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
- String serializedSSPAsJson = serializeSSPToJson(partition);
- if (taskNames == null || taskNames.isEmpty()) {
- LOG.info("Deleting the key: {} from the metadata store.", partition);
- metadataStore.delete(serializedSSPAsJson);
- } else {
- try {
- String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames);
- byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString);
- LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames);
- metadataStore.put(serializedSSPAsJson, taskNamesAsBytes);
- } catch (Exception e) {
- throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
+ public void writeTaskPartitionAssignments(Map<SystemStreamPartition, List<String>> sspToTaskNameMapping) {
+ for (SystemStreamPartition partition: sspToTaskNameMapping.keySet()) {
+ List<String> taskNames = sspToTaskNameMapping.get(partition);
+ // For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
+ // SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
+ // systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
+ String serializedSSPAsJson = serializeSSPToJson(partition);
+ if (taskNames == null || taskNames.isEmpty()) {
+ LOG.info("Deleting the key: {} from the metadata store.", partition);
+ metadataStore.delete(serializedSSPAsJson);
+ } else {
+ try {
+ String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames);
+ byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString);
+ LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames);
+ metadataStore.put(serializedSSPAsJson, taskNamesAsBytes);
+ } catch (Exception e) {
+ throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
+ }
}
}
+ metadataStore.flush();
}
/**
@@ -120,6 +124,7 @@ public class TaskPartitionAssignmentManager {
String serializedSSPAsJson = serializeSSPToJson(systemStreamPartition);
metadataStore.delete(serializedSSPAsJson);
}
+ metadataStore.flush();
}
/**
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java b/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java
index 284c0bf..ddcc72b 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java
@@ -84,6 +84,7 @@ public class RunIdGenerator {
String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
LOG.info("Writing the run id for this run as {}", runId);
metadataStore.put(CoordinationConstants.RUNID_STORE_KEY, runId.getBytes("UTF-8"));
+ metadataStore.flush();
} else {
runId = new String(metadataStore.get(CoordinationConstants.RUNID_STORE_KEY));
LOG.info("Read the run id for this run as {}", runId);
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index e01a4c6..24ce457 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -126,19 +126,6 @@ public class CoordinatorStreamStore implements MetadataStore {
@Override
public void put(String namespacedKey, byte[] value) {
- putWithoutFlush(namespacedKey, value);
- flush();
- }
-
- @Override
- public void putAll(Map<String, byte[]> entries) {
- for (Map.Entry<String, byte[]> entry : entries.entrySet()) {
- putWithoutFlush(entry.getKey(), entry.getValue());
- }
- flush();
- }
-
- private void putWithoutFlush(String namespacedKey, byte[] value) {
// 1. Store the namespace and key into correct fields of the CoordinatorStreamKey and convert the key to bytes.
CoordinatorMessageKey coordinatorMessageKey = deserializeCoordinatorMessageKeyFromJson(namespacedKey);
CoordinatorStreamKeySerde keySerde = new CoordinatorStreamKeySerde(coordinatorMessageKey.getNamespace());
diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
index 000e55a..f55f02f 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -190,6 +190,7 @@ public class LocalJobPlanner extends JobPlanner {
streamManager.createStreams(intStreams);
String streamCreatedMessage = "Streams created by processor " + processorId;
metadataStore.put(String.format(STREAM_CREATED_STATE_KEY, lockId), streamCreatedMessage.getBytes("UTF-8"));
+ metadataStore.flush();
distributedLock.unlock();
break;
} else {
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index e230b1a..377fa94 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -80,6 +80,7 @@ public class JobModelUtil {
byte[] jobModelSerializedAsBytes = jobModelSerializedAsString.getBytes(UTF_8);
String metadataStoreKey = getJobModelKey(jobModelVersion);
metadataStore.put(metadataStoreKey, jobModelSerializedAsBytes);
+ metadataStore.flush();
} catch (Exception e) {
throw new SamzaException(String.format("Exception occurred when storing JobModel: %s with version: %s.", jobModel, jobModelVersion), e);
}
diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
index ba5acb7..9a196c3 100644
--- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
@@ -145,6 +145,7 @@ public class StartpointManager {
try {
readWriteStore.put(toReadWriteStoreKey(ssp, taskName), objectMapper.writeValueAsBytes(startpoint));
+ readWriteStore.flush();
} catch (Exception ex) {
throw new SamzaException(String.format(
"Startpoint for SSP: %s and task: %s may not have been written to the metadata store.", ssp, taskName), ex);
@@ -208,6 +209,7 @@ public class StartpointManager {
Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
readWriteStore.delete(toReadWriteStoreKey(ssp, taskName));
+ readWriteStore.flush();
}
/**
@@ -218,6 +220,7 @@ public class StartpointManager {
for (String key : readWriteKeys) {
readWriteStore.delete(key);
}
+ readWriteStore.flush();
}
/**
@@ -273,6 +276,7 @@ public class StartpointManager {
StartpointFanOutPerTask newFanOut = fanOuts.get(taskName);
fanOutStore.put(fanOutKey, objectMapper.writeValueAsBytes(newFanOut));
}
+ fanOutStore.flush();
for (SystemStreamPartition ssp : deleteKeys.keySet()) {
for (TaskName taskName : deleteKeys.get(ssp)) {
@@ -314,6 +318,7 @@ public class StartpointManager {
Preconditions.checkNotNull(taskName, "TaskName cannot be null");
fanOutStore.delete(toFanOutStoreKey(taskName));
+ fanOutStore.flush();
}
/**
@@ -324,6 +329,7 @@ public class StartpointManager {
for (String key : fanOutKeys) {
fanOutStore.delete(key);
}
+ fanOutStore.flush();
}
@VisibleForTesting
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
index e86e21a..4eed058 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
@@ -102,6 +102,7 @@ public class ChangelogStreamManager {
metadataStore.delete(taskName);
}
}
+ metadataStore.flush();
}
/**
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index dfd0304..feaabba 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -324,6 +324,7 @@ public class ZkJobCoordinator implements JobCoordinator {
byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
configStore.put(entry.getKey(), serializedValue);
}
+ configStore.flush();
// fan out the startpoints
StartpointManager startpointManager = createStartpointManager();
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index b8c18fe..e2dbf3f 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -240,10 +240,12 @@ object JobModelManager extends Logging {
// taskName to SystemStreamPartitions is done here to wire-in the data to {@see JobModel}.
val sspToTaskNameMap: util.Map[SystemStreamPartition, util.List[String]] = new util.HashMap[SystemStreamPartition, util.List[String]]()
+ val taskContainerMappings: util.Map[String, util.Map[String, TaskMode]] = new util.HashMap[String, util.Map[String, TaskMode]]()
+
for (container <- jobModel.getContainers.values()) {
for ((taskName, taskModel) <- container.getTasks) {
- info ("Storing task: %s and container ID: %s into metadata store" format(taskName.getTaskName, container.getId))
- taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, container.getTasks.get(taskName).getTaskMode)
+ taskContainerMappings.putIfAbsent(container.getId, new util.HashMap[String, TaskMode]())
+ taskContainerMappings.get(container.getId).put(taskName.getTaskName, container.getTasks.get(taskName).getTaskMode)
for (partition <- taskModel.getSystemStreamPartitions) {
if (!sspToTaskNameMap.containsKey(partition)) {
sspToTaskNameMap.put(partition, new util.ArrayList[String]())
@@ -253,10 +255,8 @@ object JobModelManager extends Logging {
}
}
- for ((ssp, taskNames) <- sspToTaskNameMap) {
- info ("Storing ssp: %s and task: %s into metadata store" format(ssp, taskNames))
- taskPartitionAssignmentManager.writeTaskPartitionAssignment(ssp, taskNames)
- }
+ taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings)
+ taskPartitionAssignmentManager.writeTaskPartitionAssignments(sspToTaskNameMap);
}
/**
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index 357e8ae..34909e3 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -60,10 +60,13 @@ public class TestTaskAssignmentManager {
@Test
public void testTaskAssignmentManager() {
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1", "Task2", "2", "Task3", "0", "Task4", "1");
+ Map<String, Map<String, TaskMode>> taskContainerMappings = ImmutableMap.of(
+ "0", ImmutableMap.of("Task0", TaskMode.Active, "Task3", TaskMode.Active),
+ "1", ImmutableMap.of("Task1", TaskMode.Active, "Task4", TaskMode.Active),
+ "2", ImmutableMap.of("Task2", TaskMode.Active)
+ );
- for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
- taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
- }
+ taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings);
Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
@@ -75,10 +78,12 @@ public class TestTaskAssignmentManager {
@Test
public void testDeleteMappings() {
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1");
+ Map<String, Map<String, TaskMode>> taskContainerMappings = ImmutableMap.of(
+ "0", ImmutableMap.of("Task0", TaskMode.Active),
+ "1", ImmutableMap.of("Task1", TaskMode.Active)
+ );
- for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
- taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
- }
+ taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings);
Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
assertEquals(expectedMap, localMap);
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
index 1a7f0c9..1a0dc48 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
@@ -63,7 +63,7 @@ public class TestTaskPartitionAssignmentManager {
@Test
public void testReadAfterWrite() {
List<String> testTaskNames = ImmutableList.of("test-task1", "test-task2", "test-task3");
- taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames);
+ taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames));
Map<SystemStreamPartition, List<String>> expectedMapping = ImmutableMap.of(testSystemStreamPartition, testTaskNames);
Map<SystemStreamPartition, List<String>> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments();
@@ -74,7 +74,7 @@ public class TestTaskPartitionAssignmentManager {
@Test
public void testDeleteAfterWrite() {
List<String> testTaskNames = ImmutableList.of("test-task1", "test-task2", "test-task3");
- taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames);
+ taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames));
Map<SystemStreamPartition, List<String>> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments();
Assert.assertEquals(1, actualMapping.size());
@@ -94,12 +94,13 @@ public class TestTaskPartitionAssignmentManager {
SystemStreamPartition testSystemStreamPartition3 = new SystemStreamPartition(TEST_SYSTEM, "stream-3", PARTITION);
List<String> testTaskNames3 = ImmutableList.of("test-task6", "test-task7", "test-task8");
- taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition1, testTaskNames1);
- taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition2, testTaskNames2);
- taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition3, testTaskNames3);
+ taskPartitionAssignmentManager.writeTaskPartitionAssignments(
+ ImmutableMap.of(testSystemStreamPartition1, testTaskNames1, testSystemStreamPartition2, testTaskNames2,
+ testSystemStreamPartition3, testTaskNames3));
- Map<SystemStreamPartition, List<String>> expectedMapping = ImmutableMap.of(testSystemStreamPartition1, testTaskNames1,
- testSystemStreamPartition2, testTaskNames2, testSystemStreamPartition3, testTaskNames3);
+ Map<SystemStreamPartition, List<String>> expectedMapping =
+ ImmutableMap.of(testSystemStreamPartition1, testTaskNames1, testSystemStreamPartition2, testTaskNames2,
+ testSystemStreamPartition3, testTaskNames3);
Map<SystemStreamPartition, List<String>> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments();
Assert.assertEquals(expectedMapping, actualMapping);
@@ -108,14 +109,11 @@ public class TestTaskPartitionAssignmentManager {
@Test
public void testMultipleUpdatesReturnsTheMostRecentValue() {
List<String> testTaskNames1 = ImmutableList.of("test-task1", "test-task2", "test-task3");
-
- taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames1);
-
+ taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames1));
List<String> testTaskNames2 = ImmutableList.of("test-task4", "test-task5");
- taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames2);
-
+ taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames2));
List<String> testTaskNames3 = ImmutableList.of("test-task6", "test-task7", "test-task8");
- taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames3);
+ taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames3));
Map<SystemStreamPartition, List<String>> expectedMapping = ImmutableMap.of(testSystemStreamPartition, testTaskNames3);
Map<SystemStreamPartition, List<String>> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments();
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index d58cf18..fe94e83 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -270,7 +270,7 @@ public class TestJobModelManager {
when(mockJobModel.getContainers()).thenReturn(containerMapping);
when(mockGrouperMetadata.getPreviousTaskToProcessorAssignment()).thenReturn(new HashMap<>());
- Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMapping(Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMappings(Mockito.any());
JobModelManager.updateTaskAssignments(mockJobModel, mockTaskAssignmentManager, mockTaskPartitionAssignmentManager, mockGrouperMetadata);
@@ -289,18 +289,18 @@ public class TestJobModelManager {
// Verifications
Mockito.verify(mockJobModel, atLeast(1)).getContainers();
Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any());
- Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-1", "test-container-id", TaskMode.Active);
- Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-2", "test-container-id", TaskMode.Active);
- Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-3", "test-container-id", TaskMode.Active);
- Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-4", "test-container-id", TaskMode.Active);
+ Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMappings(ImmutableMap.of("test-container-id",
+ ImmutableMap.of("task-1", TaskMode.Active, "task-2", TaskMode.Active, "task-3", TaskMode.Active, "task-4", TaskMode.Active)));
// Verify that the old, stale partition mappings had been purged in the coordinator stream.
Mockito.verify(mockTaskPartitionAssignmentManager).delete(systemStreamPartitions);
// Verify that the new task to partition assignment is stored in the coordinator stream.
- Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition1, ImmutableList.of("task-1"));
- Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition2, ImmutableList.of("task-2"));
- Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition3, ImmutableList.of("task-3"));
- Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition4, ImmutableList.of("task-4"));
+ Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignments(ImmutableMap.of(
+ testSystemStreamPartition1, ImmutableList.of("task-1"),
+ testSystemStreamPartition2, ImmutableList.of("task-2"),
+ testSystemStreamPartition3, ImmutableList.of("task-3"),
+ testSystemStreamPartition4, ImmutableList.of("task-4")
+ ));
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java
index 39bc583..ef24541 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java
@@ -51,6 +51,7 @@ public class TestRunIdGenerator {
verify(membership, Mockito.times(1)).registerProcessor();
verify(membership, Mockito.times(1)).getNumberOfProcessors();
verify(metadataStore, Mockito.times(1)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class));
+ verify(metadataStore, Mockito.times(1)).flush();
}
@Test
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
index e073a0e..5865de9 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
@@ -105,7 +105,6 @@ public class TestCoordinatorStreamStore {
Assert.assertEquals(value3, spyCoordinatorStreamStore.get(key3));
Assert.assertEquals(value4, spyCoordinatorStreamStore.get(key4));
Assert.assertEquals(value5, spyCoordinatorStreamStore.get(key5));
- Mockito.verify(spyCoordinatorStreamStore).flush(); // verify flush called only once during putAll
}
@Test
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 972a042..cef7906 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -457,6 +457,7 @@ public class TestLocalApplicationRunner {
verify(coordinationUtils, Mockito.times(1)).getLock(CoordinationConstants.RUNID_LOCK_ID);
verify(clusterMembership, Mockito.times(1)).getNumberOfProcessors();
verify(metadataStore, Mockito.times(1)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class));
+ verify(metadataStore, Mockito.times(1)).flush();
}
/**
@@ -483,6 +484,7 @@ public class TestLocalApplicationRunner {
verify(coordinationUtils, Mockito.times(0)).getClusterMembership();
verify(clusterMembership, Mockito.times(0)).getNumberOfProcessors();
verify(metadataStore, Mockito.times(0)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class));
+ verify(metadataStore, Mockito.times(0)).flush();
}
private void prepareTestForRunId() throws Exception {