You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/12/08 20:26:00 UTC
[beam] branch master updated: [BEAM-11417] Use Cache with time
eviction policy for commitCallbacks
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 80eff6c [BEAM-11417] Use Cache with time eviction policy for commitCallbacks
new a9a1c8e Merge pull request #13507 from [BEAM-11417] Use Cache with time eviction policy for commitCallbacks
80eff6c is described below
commit 80eff6c571fd34c9a4b1024ff4201c1e2dd23dad
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Dec 8 11:00:20 2020 -0800
[BEAM-11417] Use Cache with time eviction policy for commitCallbacks
---
.../runners/dataflow/worker/StreamingDataflowWorker.java | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 6c127c8..6771e04 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -138,6 +138,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.EvictingQueue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
@@ -397,8 +399,11 @@ public class StreamingDataflowWorker {
new WeightedBoundedQueue<>(
MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize()));
- // Map of tokens to commit callbacks.
- private final ConcurrentMap<Long, Runnable> commitCallbacks = new ConcurrentHashMap<>();
+ // Cache of tokens to commit callbacks.
+ // Using Cache with time eviction policy helps us to prevent memory leak when callback ids are
+ // discarded by Dataflow service and calling commitCallback is best-effort.
+ private final Cache<Long, Runnable> commitCallbacks =
+ CacheBuilder.newBuilder().expireAfterWrite(5L, TimeUnit.MINUTES).build();
// Map of user state names to system state names.
// TODO(drieber): obsolete stateNameMap. Use transformUserNameToStateFamily in
@@ -1200,12 +1205,13 @@ public class StreamingDataflowWorker {
private void callFinalizeCallbacks(Windmill.WorkItem work) {
for (Long callbackId : work.getSourceState().getFinalizeIdsList()) {
- final Runnable callback = commitCallbacks.remove(callbackId);
+ final Runnable callback = commitCallbacks.getIfPresent(callbackId);
// NOTE: It is possible the same callback id may be removed twice if
// windmill restarts.
// TODO: It is also possible for an earlier finalized id to be lost.
// We should automatically discard all older callbacks for the same computation and key.
if (callback != null) {
+ commitCallbacks.invalidate(callbackId);
workUnitExecutor.forceExecute(
() -> {
try {