You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/07/23 06:09:00 UTC

[samza] branch master updated: SAMZA-2278: Delete all for startpoints and fan outs. (#1110)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new b1d9a0c  SAMZA-2278: Delete all for startpoints and fan outs. (#1110)
b1d9a0c is described below

commit b1d9a0c0ff76762b3a68664052c7e07e33b9e41f
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Mon Jul 22 23:08:56 2019 -0700

    SAMZA-2278: Delete all for startpoints and fan outs. (#1110)
---
 .../apache/samza/startpoint/StartpointManager.java | 20 ++++++
 .../samza/startpoint/TestStartpointManager.java    | 75 ++++++++++++++++++++++
 2 files changed, 95 insertions(+)

diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
index 0c070cb..ba5acb7 100644
--- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
@@ -211,6 +211,16 @@ public class StartpointManager {
   }
 
   /**
+   * Deletes all {@link Startpoint}s
+   */
+  public void deleteAllStartpoints() {
+    Set<String> readWriteKeys = readWriteStore.all().keySet();
+    for (String key : readWriteKeys) {
+      readWriteStore.delete(key);
+    }
+  }
+
+  /**
    * The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with
    * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace
    * to a "fan out" namespace.
@@ -306,6 +316,16 @@ public class StartpointManager {
     fanOutStore.delete(toFanOutStoreKey(taskName));
   }
 
+  /**
+   * Deletes all fanned out {@link Startpoint}s
+   */
+  public void removeAllFanOuts() {
+    Set<String> fanOutKeys = fanOutStore.all().keySet();
+    for (String key : fanOutKeys) {
+      fanOutStore.delete(key);
+    }
+  }
+
   @VisibleForTesting
   MetadataStore getReadWriteStore() {
     return readWriteStore;
diff --git a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
index d615c54..d5274a8 100644
--- a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
+++ b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
@@ -311,4 +311,79 @@ public class TestStartpointManager {
       Assert.assertTrue(startpointManager.getFanOutForTask(taskName).isEmpty());
     }
   }
+
+  @Test
+  public void testRemoveAllFanOuts() throws IOException {
+    SystemStreamPartition sspBroadcast = new SystemStreamPartition("mockSystem1", "mockStream1", new Partition(2));
+    SystemStreamPartition sspSingle = new SystemStreamPartition("mockSystem2", "mockStream2", new Partition(3));
+
+    TaskName taskWithNonBroadcast = new TaskName("t1");
+
+    List<TaskName> tasks =
+        ImmutableList.of(new TaskName("t0"), taskWithNonBroadcast, new TaskName("t2"), new TaskName("t3"), new TaskName("t4"), new TaskName("t5"));
+
+    Map<TaskName, Set<SystemStreamPartition>> taskToSSPs = tasks.stream()
+        .collect(Collectors
+            .toMap(task -> task, task -> task.equals(taskWithNonBroadcast) ? ImmutableSet.of(sspBroadcast, sspSingle) : ImmutableSet.of(sspBroadcast)));
+
+    StartpointSpecific startpoint42 = new StartpointSpecific("42");
+
+    startpointManager.writeStartpoint(sspBroadcast, startpoint42);
+    startpointManager.writeStartpoint(sspSingle, startpoint42);
+
+    // startpoint42 should remap with key sspBroadcast to all tasks + sspBroadcast
+    Map<TaskName, Map<SystemStreamPartition, Startpoint>> tasksFannedOutTo = startpointManager.fanOut(taskToSSPs);
+    Assert.assertEquals(tasks.size(), tasksFannedOutTo.size());
+    Assert.assertTrue(tasksFannedOutTo.keySet().containsAll(tasks));
+    Assert.assertFalse("Should be deleted after fan out", startpointManager.readStartpoint(sspBroadcast).isPresent());
+    Assert.assertFalse("Should be deleted after fan out", startpointManager.readStartpoint(sspSingle).isPresent());
+
+    startpointManager.removeAllFanOuts();
+
+    // Write back to ensure removing all fan outs doesn't remove all startpoints
+    startpointManager.writeStartpoint(sspBroadcast, startpoint42);
+    startpointManager.writeStartpoint(sspSingle, startpoint42);
+
+    Assert.assertEquals(0, startpointManager.getFanOutStore().all().size());
+    Assert.assertTrue("Should not be deleted after remove all fan outs", startpointManager.readStartpoint(sspBroadcast).isPresent());
+    Assert.assertTrue("Should not be deleted after remove all fan outs", startpointManager.readStartpoint(sspSingle).isPresent());
+  }
+
+  @Test
+  public void testDeleteAllStartpoints() throws IOException {
+    SystemStreamPartition sspBroadcast = new SystemStreamPartition("mockSystem1", "mockStream1", new Partition(2));
+    SystemStreamPartition sspSingle = new SystemStreamPartition("mockSystem2", "mockStream2", new Partition(3));
+
+    TaskName taskWithNonBroadcast = new TaskName("t1");
+
+    List<TaskName> tasks =
+        ImmutableList.of(new TaskName("t0"), taskWithNonBroadcast, new TaskName("t2"), new TaskName("t3"), new TaskName("t4"), new TaskName("t5"));
+
+    Map<TaskName, Set<SystemStreamPartition>> taskToSSPs = tasks.stream()
+        .collect(Collectors
+            .toMap(task -> task, task -> task.equals(taskWithNonBroadcast) ? ImmutableSet.of(sspBroadcast, sspSingle) : ImmutableSet.of(sspBroadcast)));
+
+    StartpointSpecific startpoint42 = new StartpointSpecific("42");
+
+    startpointManager.writeStartpoint(sspBroadcast, startpoint42);
+    startpointManager.writeStartpoint(sspSingle, startpoint42);
+
+    // startpoint42 should remap with key sspBroadcast to all tasks + sspBroadcast
+    Map<TaskName, Map<SystemStreamPartition, Startpoint>> tasksFannedOutTo = startpointManager.fanOut(taskToSSPs);
+    Assert.assertEquals(tasks.size(), tasksFannedOutTo.size());
+    Assert.assertTrue(tasksFannedOutTo.keySet().containsAll(tasks));
+    Assert.assertFalse("Should be deleted after fan out", startpointManager.readStartpoint(sspBroadcast).isPresent());
+    Assert.assertFalse("Should be deleted after fan out", startpointManager.readStartpoint(sspSingle).isPresent());
+
+    // Re-populate startpoints after fan out
+    startpointManager.writeStartpoint(sspBroadcast, startpoint42);
+    startpointManager.writeStartpoint(sspSingle, startpoint42);
+    Assert.assertEquals(2, startpointManager.getReadWriteStore().all().size());
+
+    startpointManager.deleteAllStartpoints();
+    Assert.assertEquals(0, startpointManager.getReadWriteStore().all().size());
+
+    // Fan outs should be untouched
+    Assert.assertEquals(tasks.size(), startpointManager.getFanOutStore().all().size());
+  }
 }