You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/07/23 06:09:00 UTC
[samza] branch master updated: SAMZA-2278: Delete all for
startpoints and fan outs. (#1110)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new b1d9a0c SAMZA-2278: Delete all for startpoints and fan outs. (#1110)
b1d9a0c is described below
commit b1d9a0c0ff76762b3a68664052c7e07e33b9e41f
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Mon Jul 22 23:08:56 2019 -0700
SAMZA-2278: Delete all for startpoints and fan outs. (#1110)
---
.../apache/samza/startpoint/StartpointManager.java | 20 ++++++
.../samza/startpoint/TestStartpointManager.java | 75 ++++++++++++++++++++++
2 files changed, 95 insertions(+)
diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
index 0c070cb..ba5acb7 100644
--- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
@@ -211,6 +211,16 @@ public class StartpointManager {
}
/**
+ * Deletes all {@link Startpoint}s
+ */
+ public void deleteAllStartpoints() {
+ Set<String> readWriteKeys = readWriteStore.all().keySet();
+ for (String key : readWriteKeys) {
+ readWriteStore.delete(key);
+ }
+ }
+
+ /**
* The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with
* {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace
* to a "fan out" namespace.
@@ -306,6 +316,16 @@ public class StartpointManager {
fanOutStore.delete(toFanOutStoreKey(taskName));
}
+ /**
+ * Deletes all fanned out {@link Startpoint}s
+ */
+ public void removeAllFanOuts() {
+ Set<String> fanOutKeys = fanOutStore.all().keySet();
+ for (String key : fanOutKeys) {
+ fanOutStore.delete(key);
+ }
+ }
+
@VisibleForTesting
MetadataStore getReadWriteStore() {
return readWriteStore;
diff --git a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
index d615c54..d5274a8 100644
--- a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
+++ b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
@@ -311,4 +311,79 @@ public class TestStartpointManager {
Assert.assertTrue(startpointManager.getFanOutForTask(taskName).isEmpty());
}
}
+
+ @Test
+ public void testRemoveAllFanOuts() throws IOException {
+ SystemStreamPartition sspBroadcast = new SystemStreamPartition("mockSystem1", "mockStream1", new Partition(2));
+ SystemStreamPartition sspSingle = new SystemStreamPartition("mockSystem2", "mockStream2", new Partition(3));
+
+ TaskName taskWithNonBroadcast = new TaskName("t1");
+
+ List<TaskName> tasks =
+ ImmutableList.of(new TaskName("t0"), taskWithNonBroadcast, new TaskName("t2"), new TaskName("t3"), new TaskName("t4"), new TaskName("t5"));
+
+ Map<TaskName, Set<SystemStreamPartition>> taskToSSPs = tasks.stream()
+ .collect(Collectors
+ .toMap(task -> task, task -> task.equals(taskWithNonBroadcast) ? ImmutableSet.of(sspBroadcast, sspSingle) : ImmutableSet.of(sspBroadcast)));
+
+ StartpointSpecific startpoint42 = new StartpointSpecific("42");
+
+ startpointManager.writeStartpoint(sspBroadcast, startpoint42);
+ startpointManager.writeStartpoint(sspSingle, startpoint42);
+
+ // startpoint42 should remap with key sspBroadcast to all tasks + sspBroadcast
+ Map<TaskName, Map<SystemStreamPartition, Startpoint>> tasksFannedOutTo = startpointManager.fanOut(taskToSSPs);
+ Assert.assertEquals(tasks.size(), tasksFannedOutTo.size());
+ Assert.assertTrue(tasksFannedOutTo.keySet().containsAll(tasks));
+ Assert.assertFalse("Should be deleted after fan out", startpointManager.readStartpoint(sspBroadcast).isPresent());
+ Assert.assertFalse("Should be deleted after fan out", startpointManager.readStartpoint(sspSingle).isPresent());
+
+ startpointManager.removeAllFanOuts();
+
+ // Write back to ensure removing all fan outs doesn't remove all startpoints
+ startpointManager.writeStartpoint(sspBroadcast, startpoint42);
+ startpointManager.writeStartpoint(sspSingle, startpoint42);
+
+ Assert.assertEquals(0, startpointManager.getFanOutStore().all().size());
+ Assert.assertTrue("Should not be deleted after remove all fan outs", startpointManager.readStartpoint(sspBroadcast).isPresent());
+ Assert.assertTrue("Should not be deleted after remove all fan outs", startpointManager.readStartpoint(sspSingle).isPresent());
+ }
+
+ @Test
+ public void testDeleteAllStartpoints() throws IOException {
+ SystemStreamPartition sspBroadcast = new SystemStreamPartition("mockSystem1", "mockStream1", new Partition(2));
+ SystemStreamPartition sspSingle = new SystemStreamPartition("mockSystem2", "mockStream2", new Partition(3));
+
+ TaskName taskWithNonBroadcast = new TaskName("t1");
+
+ List<TaskName> tasks =
+ ImmutableList.of(new TaskName("t0"), taskWithNonBroadcast, new TaskName("t2"), new TaskName("t3"), new TaskName("t4"), new TaskName("t5"));
+
+ Map<TaskName, Set<SystemStreamPartition>> taskToSSPs = tasks.stream()
+ .collect(Collectors
+ .toMap(task -> task, task -> task.equals(taskWithNonBroadcast) ? ImmutableSet.of(sspBroadcast, sspSingle) : ImmutableSet.of(sspBroadcast)));
+
+ StartpointSpecific startpoint42 = new StartpointSpecific("42");
+
+ startpointManager.writeStartpoint(sspBroadcast, startpoint42);
+ startpointManager.writeStartpoint(sspSingle, startpoint42);
+
+ // startpoint42 should remap with key sspBroadcast to all tasks + sspBroadcast
+ Map<TaskName, Map<SystemStreamPartition, Startpoint>> tasksFannedOutTo = startpointManager.fanOut(taskToSSPs);
+ Assert.assertEquals(tasks.size(), tasksFannedOutTo.size());
+ Assert.assertTrue(tasksFannedOutTo.keySet().containsAll(tasks));
+ Assert.assertFalse("Should be deleted after fan out", startpointManager.readStartpoint(sspBroadcast).isPresent());
+ Assert.assertFalse("Should be deleted after fan out", startpointManager.readStartpoint(sspSingle).isPresent());
+
+ // Re-populate startpoints after fan out
+ startpointManager.writeStartpoint(sspBroadcast, startpoint42);
+ startpointManager.writeStartpoint(sspSingle, startpoint42);
+ Assert.assertEquals(2, startpointManager.getReadWriteStore().all().size());
+
+ startpointManager.deleteAllStartpoints();
+ Assert.assertEquals(0, startpointManager.getReadWriteStore().all().size());
+
+ // Fan outs should be untouched
+ Assert.assertEquals(tasks.size(), startpointManager.getFanOutStore().all().size());
+ }
}