You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "scwhittle (via GitHub)" <gi...@apache.org> on 2023/04/20 08:54:09 UTC

[GitHub] [beam] scwhittle commented on a diff in pull request #23492: Add Windmill support for MultimapState

scwhittle commented on code in PR #23492:
URL: https://github.com/apache/beam/pull/23492#discussion_r1172286648


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java:
##########
@@ -1812,6 +1755,62 @@ true, key(NAMESPACE, tag), STATE_FAMILY, VarIntCoder.of()))
     assertTrue(entryUpdate.getDeleteAll());
   }
 
+  @Test
+  public void testMultimapFuzzTest() {
+    final String tag = "multimap";
+    StateTag<MultimapState<String, Integer>> addr =
+        StateTags.multimap(tag, StringUtf8Coder.of(), VarIntCoder.of());
+    MultimapState<String, Integer> multimapState = underTest.state(NAMESPACE, addr);
+
+    SettableFuture<Iterable<Map.Entry<ByteString, Iterable<Integer>>>> entriesFuture =
+        SettableFuture.create();
+    when(mockReader.multimapFetchAllFuture(
+            false, key(NAMESPACE, tag), STATE_FAMILY, VarIntCoder.of()))
+        .thenReturn(entriesFuture);
+
+    // to set up the multimap as cache complete
+    waitAndSet(entriesFuture, Collections.emptyList(), 30);
+    multimapState.entries().read();
+
+    Multimap<String, Integer> mirror = ArrayListMultimap.create();
+
+    Random rand = new Random();
+
+    final int ROUNDS = 100;
+    final int OPS_PER_ROUND = 2000;
+    final int NUM_KEY = 20;
+    for (int i = 0; i < ROUNDS; i++) {
+      for (int j = 0; j < OPS_PER_ROUND; j++) {
+        int op = rand.nextInt(100);
+        String key = "key" + rand.nextInt(NUM_KEY);
+        if (op < 50) {
+          // 50% add operation
+          Integer value = rand.nextInt();
+          multimapState.put(key, value);
+          mirror.put(key, value);
+        } else if (op < 95) {
+          // 45% remove key operation
+          multimapState.remove(key);
+          mirror.removeAll(key);
+        } else {
+          // 5% clear operation
+          multimapState.clear();
+          mirror.clear();
+        }
+      }
+      Iterable<String> read = multimapState.keys().read();
+      Set<String> bytes = mirror.keySet();
+      assertThat(read, Matchers.containsInAnyOrder(bytes.toArray()));
+      for (String key : multimapState.keys().read()) {
+        assertThat(
+            multimapState.get(key).read(), Matchers.containsInAnyOrder(mirror.get(key).toArray()));
+      }
+      Windmill.WorkItemCommitRequest.Builder commitBuilder =
+          Windmill.WorkItemCommitRequest.newBuilder();
+      underTest.persist(commitBuilder);

Review Comment:
   can you also recreate the multimap after some rounds so that it has to initialize from persisted state as necessary? As is it is fully cached all the time and thus is likely not merging in-memory or on-disk state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org