You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/03/23 04:52:43 UTC

[beam] branch master updated: [BEAM-13015] Clarify ownership of the list for state caching across bundles (#16982)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ef6560  [BEAM-13015] Clarify ownership of the list for state caching across bundles (#16982)
7ef6560 is described below

commit 7ef656070e63f9b41de92aa9ee7a33906b40cb09
Author: scwhittle <sc...@users.noreply.github.com>
AuthorDate: Wed Mar 23 05:51:18 2022 +0100

    [BEAM-13015] Clarify ownership of the list for state caching across bundles (#16982)
    
    * Avoid copy when adding list with clearAndAppend.
    Also fixes issue where weighed object was different than cached object.
    
    * Update sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
    
    * Update sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
    
    * Update sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
    
    Fix spotless
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
---
 .../src/main/java/org/apache/beam/fn/harness/state/BagUserState.java | 2 ++
 .../java/org/apache/beam/fn/harness/state/MultimapUserState.java     | 5 ++++-
 .../org/apache/beam/fn/harness/state/StateFetchingIterators.java     | 4 ++--
 3 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
index b3f6d88..8f8f206 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
@@ -141,6 +141,8 @@ public class BagUserState<T> {
 
     // Modify the underlying cached state depending on the mutations performed
     if (isCleared) {
+      // Note this takes ownership of newValues. This object is no longer used after it has been
+      // closed.
       oldValues.clearAndAppend(newValues);
     } else {
       oldValues.append(newValues);
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
index 5a7b157..f98290f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
@@ -317,7 +317,8 @@ public class MultimapUserState<K, V> {
     }
 
     if (isCleared) {
-      // This will clear all keys and values since values is a sub-cache of keys.
+      // This will clear all keys and values since values is a sub-cache of keys. Note this
+      // takes ownership of pendingAddKeys. This object is no longer used after it has been closed.
       persistedKeys.clearAndAppend(pendingAddsKeys);
 
       // Since the map was cleared we can add all the values that are pending since we know
@@ -325,6 +326,8 @@ public class MultimapUserState<K, V> {
       for (Map.Entry<Object, KV<K, List<V>>> entry : pendingAdds.entrySet()) {
         CachingStateIterable<V> iterable =
             getPersistedValues(entry.getKey(), entry.getValue().getKey());
+        // Note this takes ownership of the list but this object is no longer used after it has
+        // been closed.
         iterable.clearAndAppend(entry.getValue().getValue());
       }
     } else {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
index 24d50f5..85d0b3d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
@@ -277,7 +277,7 @@ public class StateFetchingIterators {
     }
 
     /**
-     * Clears the cached iterable and appends the set of values.
+     * Clears the cached iterable and appends the set of values, taking ownership of the list.
      *
      * <p>Mutations over the Beam Fn State API must have been performed before any future lookups.
      *
@@ -287,7 +287,7 @@ public class StateFetchingIterators {
     public void clearAndAppend(List<T> values) {
       cache.put(
           IterableCacheKey.INSTANCE,
-          new MutatedBlocks<>(Block.mutatedBlock(new ArrayList<>(values), Caches.weigh(values))));
+          new MutatedBlocks<>(Block.mutatedBlock(values, Caches.weigh(values))));
     }
 
     @Override