You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/12/08 20:26:00 UTC

[beam] branch master updated: [BEAM-11417] Use Cache with time eviction policy for commitCallbacks

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 80eff6c  [BEAM-11417] Use Cache with time eviction policy for commitCallbacks
     new a9a1c8e  Merge pull request #13507 from [BEAM-11417] Use Cache with time eviction policy for commitCallbacks
80eff6c is described below

commit 80eff6c571fd34c9a4b1024ff4201c1e2dd23dad
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Dec 8 11:00:20 2020 -0800

    [BEAM-11417] Use Cache with time eviction policy for commitCallbacks
---
 .../runners/dataflow/worker/StreamingDataflowWorker.java     | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 6c127c8..6771e04 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -138,6 +138,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.EvictingQueue;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
@@ -397,8 +399,11 @@ public class StreamingDataflowWorker {
       new WeightedBoundedQueue<>(
           MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize()));
 
-  // Map of tokens to commit callbacks.
-  private final ConcurrentMap<Long, Runnable> commitCallbacks = new ConcurrentHashMap<>();
+  // Cache of tokens to commit callbacks.
+  // Using Cache with time eviction policy helps us to prevent memory leak when callback ids are
+  // discarded by Dataflow service and calling commitCallback is best-effort.
+  private final Cache<Long, Runnable> commitCallbacks =
+      CacheBuilder.newBuilder().expireAfterWrite(5L, TimeUnit.MINUTES).build();
 
   // Map of user state names to system state names.
   // TODO(drieber): obsolete stateNameMap. Use transformUserNameToStateFamily in
@@ -1200,12 +1205,13 @@ public class StreamingDataflowWorker {
 
   private void callFinalizeCallbacks(Windmill.WorkItem work) {
     for (Long callbackId : work.getSourceState().getFinalizeIdsList()) {
-      final Runnable callback = commitCallbacks.remove(callbackId);
+      final Runnable callback = commitCallbacks.getIfPresent(callbackId);
       // NOTE: It is possible the same callback id may be removed twice if
       // windmill restarts.
       // TODO: It is also possible for an earlier finalized id to be lost.
       // We should automatically discard all older callbacks for the same computation and key.
       if (callback != null) {
+        commitCallbacks.invalidate(callbackId);
         workUnitExecutor.forceExecute(
             () -> {
               try {