You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/03/23 04:52:43 UTC
[beam] branch master updated: [BEAM-13015] Clarify ownership of the list for state caching across bundles (#16982)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7ef6560 [BEAM-13015] Clarify ownership of the list for state caching across bundles (#16982)
7ef6560 is described below
commit 7ef656070e63f9b41de92aa9ee7a33906b40cb09
Author: scwhittle <sc...@users.noreply.github.com>
AuthorDate: Wed Mar 23 05:51:18 2022 +0100
[BEAM-13015] Clarify ownership of the list for state caching across bundles (#16982)
* Avoid copy when adding list with clearAndAppend.
Also fixes issue where weighed object was different than cached object.
* Update sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
* Update sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
* Update sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
Fix spotless
Co-authored-by: Lukasz Cwik <lc...@google.com>
---
.../src/main/java/org/apache/beam/fn/harness/state/BagUserState.java | 2 ++
.../java/org/apache/beam/fn/harness/state/MultimapUserState.java | 5 ++++-
.../org/apache/beam/fn/harness/state/StateFetchingIterators.java | 4 ++--
3 files changed, 8 insertions(+), 3 deletions(-)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
index b3f6d88..8f8f206 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
@@ -141,6 +141,8 @@ public class BagUserState<T> {
// Modify the underlying cached state depending on the mutations performed
if (isCleared) {
+ // Note this takes ownership of newValues. This object is no longer used after it has been
+ // closed.
oldValues.clearAndAppend(newValues);
} else {
oldValues.append(newValues);
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
index 5a7b157..f98290f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
@@ -317,7 +317,8 @@ public class MultimapUserState<K, V> {
}
if (isCleared) {
- // This will clear all keys and values since values is a sub-cache of keys.
+ // This will clear all keys and values since values is a sub-cache of keys. Note this
+ // takes ownership of pendingAddKeys. This object is no longer used after it has been closed.
persistedKeys.clearAndAppend(pendingAddsKeys);
// Since the map was cleared we can add all the values that are pending since we know
@@ -325,6 +326,8 @@ public class MultimapUserState<K, V> {
for (Map.Entry<Object, KV<K, List<V>>> entry : pendingAdds.entrySet()) {
CachingStateIterable<V> iterable =
getPersistedValues(entry.getKey(), entry.getValue().getKey());
+ // Note this takes ownership of the list but this object is no longer used after it has
+ // been closed.
iterable.clearAndAppend(entry.getValue().getValue());
}
} else {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
index 24d50f5..85d0b3d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
@@ -277,7 +277,7 @@ public class StateFetchingIterators {
}
/**
- * Clears the cached iterable and appends the set of values.
+ * Clears the cached iterable and appends the set of values, taking ownership of the list.
*
* <p>Mutations over the Beam Fn State API must have been performed before any future lookups.
*
@@ -287,7 +287,7 @@ public class StateFetchingIterators {
public void clearAndAppend(List<T> values) {
cache.put(
IterableCacheKey.INSTANCE,
- new MutatedBlocks<>(Block.mutatedBlock(new ArrayList<>(values), Caches.weigh(values))));
+ new MutatedBlocks<>(Block.mutatedBlock(values, Caches.weigh(values))));
}
@Override