You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2018/08/22 00:44:15 UTC

[beam] branch master updated: [BEAM-5110] Explicitly count the references for BatchFlinkExecutableStageContext … (#6189)

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c41e0a  [BEAM-5110] Explicitly count the references for BatchFlinkExecutableStageContext … (#6189)
7c41e0a is described below

commit 7c41e0a915083bd3b1fe52c2a417fa38a00e6463
Author: Ankur <an...@users.noreply.github.com>
AuthorDate: Tue Aug 21 17:44:09 2018 -0700

    [BEAM-5110] Explicitly count the references for BatchFlinkExecutableStageContext … (#6189)
    
    * Explicitly count the references for BatchFlinkExecutableStageContext and clean them after TTL
    
    * Separating out ReferenceCountingFlinkExecutableStageContextFactory
    
    * Making ReferenceCountingFlinkExecutableStageFactory serializable
---
 .../FlinkBatchExecutableStageContext.java          |  29 +--
 .../functions/FlinkExecutableStageContext.java     |   6 +-
 .../functions/FlinkExecutableStageFunction.java    |   4 +-
 ...CountingFlinkExecutableStageContextFactory.java | 229 +++++++++++++++++++++
 .../streaming/ExecutableStageDoFnOperator.java     |   1 +
 ...tingFlinkExecutableStageContextFactoryTest.java |  80 +++++++
 6 files changed, 323 insertions(+), 26 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
index 9f7c171..5ba04c6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import java.io.IOException;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory;
@@ -35,7 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Implementation of a {@link FlinkExecutableStageContext} for batch jobs. */
-class FlinkBatchExecutableStageContext implements FlinkExecutableStageContext {
+class FlinkBatchExecutableStageContext implements FlinkExecutableStageContext, AutoCloseable {
   private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchExecutableStageContext.class);
 
   private final JobBundleFactory jobBundleFactory;
@@ -68,32 +65,20 @@ class FlinkBatchExecutableStageContext implements FlinkExecutableStageContext {
   }
 
   @Override
-  protected void finalize() throws Exception {
+  public void close() throws Exception {
     jobBundleFactory.close();
   }
 
   enum BatchFactory implements Factory {
-    INSTANCE;
+    REFERENCE_COUNTING;
 
-    @SuppressWarnings("Immutable") // observably immutable
-    private final LoadingCache<JobInfo, FlinkBatchExecutableStageContext> cachedContexts;
-
-    BatchFactory() {
-      cachedContexts =
-          CacheBuilder.newBuilder()
-              .weakValues()
-              .build(
-                  new CacheLoader<JobInfo, FlinkBatchExecutableStageContext>() {
-                    @Override
-                    public FlinkBatchExecutableStageContext load(JobInfo jobInfo) throws Exception {
-                      return create(jobInfo);
-                    }
-                  });
-    }
+    private static final ReferenceCountingFlinkExecutableStageContextFactory actualFactory =
+        ReferenceCountingFlinkExecutableStageContextFactory.create(
+            FlinkBatchExecutableStageContext::create);
 
     @Override
     public FlinkExecutableStageContext get(JobInfo jobInfo) {
-      return cachedContexts.getUnchecked(jobInfo);
+      return actualFactory.get(jobInfo);
     }
   }
 }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
index a1e516c..ba59b53 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
@@ -25,18 +25,20 @@ import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.flink.api.common.functions.RuntimeContext;
 
 /** The Flink context required in order to execute {@link ExecutableStage stages}. */
-public interface FlinkExecutableStageContext {
+public interface FlinkExecutableStageContext extends AutoCloseable {
 
   /**
    * Creates {@link FlinkExecutableStageContext} instances. Serializable so that factories can be
    * defined at translation time and distributed to TaskManagers.
    */
   interface Factory extends Serializable {
+
+    /** Get or create {@link FlinkExecutableStageContext} for given {@link JobInfo}. */
     FlinkExecutableStageContext get(JobInfo jobInfo);
   }
 
   static Factory batchFactory() {
-    return FlinkBatchExecutableStageContext.BatchFactory.INSTANCE;
+    return FlinkBatchExecutableStageContext.BatchFactory.REFERENCE_COUNTING;
   }
 
   StageBundleFactory getStageBundleFactory(ExecutableStage executableStage);
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index 3da58e9..c501274 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -65,8 +65,8 @@ public class FlinkExecutableStageFunction<InputT>
 
   // Worker-local fields. These should only be constructed and consumed on Flink TaskManagers.
   private transient RuntimeContext runtimeContext;
-  private transient FlinkExecutableStageContext stageContext;
   private transient StateRequestHandler stateRequestHandler;
+  private transient FlinkExecutableStageContext stageContext;
   private transient StageBundleFactory stageBundleFactory;
   private transient BundleProgressHandler progressHandler;
 
@@ -127,7 +127,7 @@ public class FlinkExecutableStageFunction<InputT>
   @Override
   public void close() throws Exception {
     try (AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
-    // Remove the reference to stageContext and make stageContext available for garbage collection.
+    try (AutoCloseable closable = stageContext) {}
     stageContext = null;
   }
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
new file mode 100644
index 0000000..5121038
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link FlinkExecutableStageContext.Factory} which counts FlinkExecutableStageContext reference
+ * for book keeping.
+ */
+public class ReferenceCountingFlinkExecutableStageContextFactory
+    implements FlinkExecutableStageContext.Factory {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReferenceCountingFlinkExecutableStageContextFactory.class);
+  private static final int TTL_IN_SECONDS = 30;
+  private static final int MAX_RETRY = 3;
+
+  private final Creator creator;
+  private transient volatile ScheduledExecutorService executor;
+  private transient volatile ConcurrentHashMap<String, WrappedContext> keyRegistry;
+
+  public static ReferenceCountingFlinkExecutableStageContextFactory create(Creator creator) {
+    return new ReferenceCountingFlinkExecutableStageContextFactory(creator);
+  }
+
+  private ReferenceCountingFlinkExecutableStageContextFactory(Creator creator) {
+    this.creator = creator;
+  }
+
+  @Override
+  public FlinkExecutableStageContext get(JobInfo jobInfo) {
+    // Retry is needed in case where an existing wrapper is picked from the cache but by
+    // the time we accessed wrapper.referenceCount, the wrapper was tombstoned by a pending
+    // release task.
+    // This race condition is highly unlikely to happen as there is no systematic coding
+    // practice which can cause this error because of TTL. However, even in very unlikely case
+    // when it happen we have the retry which get a valid context.
+    // Note: There is no leak in this logic as the cleanup is only done in release.
+    // In case of usage error where release is called before corresponding get finishes,
+    // release might throw an error. If release did not throw an error than we can be sure that
+    // the state of the system remains valid and appropriate cleanup will be done at TTL.
+    for (int retry = 0; retry < MAX_RETRY; retry++) {
+      // ConcurrentHashMap will handle the thread safety at the creation time.
+      WrappedContext wrapper =
+          getCache()
+              .computeIfAbsent(
+                  jobInfo.jobId(),
+                  jobId -> {
+                    try {
+                      return new WrappedContext(jobInfo, creator.apply(jobInfo));
+                    } catch (Exception e) {
+                      throw new RuntimeException(
+                          "Unable to create context for job " + jobInfo.jobId(), e);
+                    }
+                  });
+      // Take a lock on wrapper before modifying reference count.
+      // Use null referenceCount == null as a tombstone for the wrapper.
+      synchronized (wrapper) {
+        if (wrapper.referenceCount != null) {
+          // The wrapper is still valid.
+          // Release has not yet got the lock and has not yet removed the wrapper.
+          wrapper.referenceCount.incrementAndGet();
+          return wrapper;
+        }
+      }
+    }
+
+    throw new RuntimeException(
+        String.format(
+            "Max retry %s exhausted while creating Context for job %s",
+            MAX_RETRY, jobInfo.jobId()));
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private void scheduleRelease(JobInfo jobInfo) {
+    WrappedContext wrapper = getCache().get(jobInfo.jobId());
+    Preconditions.checkState(
+        wrapper != null, "Releasing context for unknown job: " + jobInfo.jobId());
+    // Schedule task to clean the container later.
+    getExecutor().schedule(() -> release(wrapper), TTL_IN_SECONDS, TimeUnit.SECONDS);
+  }
+
+  private ConcurrentHashMap<String, WrappedContext> getCache() {
+    // Lazily initialize keyRegistry because serialization will set it to null.
+    if (keyRegistry != null) {
+      return keyRegistry;
+    }
+    synchronized (this) {
+      if (keyRegistry == null) {
+        keyRegistry = new ConcurrentHashMap<>();
+      }
+      return keyRegistry;
+    }
+  }
+
+  private ScheduledExecutorService getExecutor() {
+    // Lazily initialize executor because serialization will set it to null.
+    if (executor != null) {
+      return executor;
+    }
+    synchronized (this) {
+      if (executor == null) {
+        executor =
+            Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
+      }
+      return executor;
+    }
+  }
+
+  @VisibleForTesting
+  void release(FlinkExecutableStageContext context) {
+    @SuppressWarnings({"unchecked", "Not exected to be called from outside."})
+    WrappedContext wrapper = (WrappedContext) context;
+    synchronized (wrapper) {
+      if (wrapper.referenceCount.decrementAndGet() == 0) {
+        // Tombstone wrapper.
+        wrapper.referenceCount = null;
+        if (getCache().remove(wrapper.jobInfo.jobId(), wrapper)) {
+          try {
+            wrapper.closeActual();
+          } catch (Exception e) {
+            LOG.error("Unable to close.", e);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * {@link WrappedContext} does not expose equals of actual {@link FlinkExecutableStageContext}.
+   */
+  private class WrappedContext implements FlinkExecutableStageContext {
+    private JobInfo jobInfo;
+    private AtomicInteger referenceCount;
+    private FlinkExecutableStageContext context;
+
+    /** {@link WrappedContext#equals(Object)} is only based on {@link JobInfo#jobId()}. */
+    WrappedContext(JobInfo jobInfo, FlinkExecutableStageContext context) {
+      this.jobInfo = jobInfo;
+      this.context = context;
+      this.referenceCount = new AtomicInteger(0);
+    }
+
+    @Override
+    public StageBundleFactory getStageBundleFactory(ExecutableStage executableStage) {
+      return context.getStageBundleFactory(executableStage);
+    }
+
+    @Override
+    public StateRequestHandler getStateRequestHandler(
+        ExecutableStage executableStage, RuntimeContext runtimeContext) {
+      return context.getStateRequestHandler(executableStage, runtimeContext);
+    }
+
+    @Override
+    public void close() {
+      // Just schedule the context as we want to reuse it if possible.
+      scheduleRelease(jobInfo);
+    }
+
+    private void closeActual() throws Exception {
+      context.close();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      WrappedContext that = (WrappedContext) o;
+      return Objects.equals(jobInfo.jobId(), that.jobInfo.jobId());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(jobInfo);
+    }
+
+    @Override
+    public String toString() {
+      return "ContextWrapper{"
+          + "jobId='"
+          + jobInfo
+          + '\''
+          + ", referenceCount="
+          + referenceCount
+          + '}';
+    }
+  }
+
+  /** Interface for creator which extends Serializable. */
+  public interface Creator
+      extends ThrowingFunction<JobInfo, FlinkExecutableStageContext>, Serializable {}
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 2c042c7..d63a3cb 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -150,6 +150,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
   public void close() throws Exception {
     try (AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
     // Remove the reference to stageContext and make stageContext available for garbage collection.
+    try (AutoCloseable closable = stageContext) {}
     stageContext = null;
     super.close();
   }
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
new file mode 100644
index 0000000..cf75864
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.Creator;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ReferenceCountingFlinkExecutableStageContextFactory}. */
+@RunWith(JUnit4.class)
+public class ReferenceCountingFlinkExecutableStageContextFactoryTest {
+
+  @Test
+  public void testCreateReuseReleaseCreate() throws Exception {
+
+    Creator creator = mock(Creator.class);
+    FlinkExecutableStageContext c1 = mock(FlinkExecutableStageContext.class);
+    FlinkExecutableStageContext c2 = mock(FlinkExecutableStageContext.class);
+    FlinkExecutableStageContext c3 = mock(FlinkExecutableStageContext.class);
+    FlinkExecutableStageContext c4 = mock(FlinkExecutableStageContext.class);
+    when(creator.apply(any(JobInfo.class)))
+        .thenReturn(c1)
+        .thenReturn(c2)
+        .thenReturn(c3)
+        .thenReturn(c4);
+    ReferenceCountingFlinkExecutableStageContextFactory factory =
+        ReferenceCountingFlinkExecutableStageContextFactory.create(creator);
+    JobInfo jobA = mock(JobInfo.class);
+    when(jobA.jobId()).thenReturn("jobA");
+    JobInfo jobB = mock(JobInfo.class);
+    when(jobB.jobId()).thenReturn("jobB");
+    FlinkExecutableStageContext ac1A = factory.get(jobA); // 1 open jobA
+    FlinkExecutableStageContext ac2B = factory.get(jobB); // 1 open jobB
+    Assert.assertSame(
+        "Context should be cached and reused.", ac1A, factory.get(jobA)); // 2 open jobA
+    Assert.assertSame(
+        "Context should be cached and reused.", ac2B, factory.get(jobB)); // 2 open jobB
+    factory.release(ac1A); // 1 open jobA
+    Assert.assertSame(
+        "Context should be cached and reused.", ac1A, factory.get(jobA)); // 2 open jobA
+    factory.release(ac1A); // 1 open jobA
+    factory.release(ac1A); // 0 open jobA
+    FlinkExecutableStageContext ac3A = factory.get(jobA); // 1 open jobA
+    Assert.assertNotSame("We should get a new instance.", ac1A, ac3A);
+    Assert.assertSame(
+        "Context should be cached and reused.", ac3A, factory.get(jobA)); // 2 open jobA
+    factory.release(ac3A); // 1 open jobA
+    factory.release(ac3A); // 0 open jobA
+    Assert.assertSame(
+        "Context should be cached and reused.", ac2B, factory.get(jobB)); // 3 open jobB
+    factory.release(ac2B); // 2 open jobB
+    factory.release(ac2B); // 1 open jobB
+    factory.release(ac2B); // 0 open jobB
+    FlinkExecutableStageContext ac4B = factory.get(jobB); // 1 open jobB
+    Assert.assertNotSame("We should get a new instance.", ac2B, ac4B);
+    factory.release(ac4B); // 0 open jobB
+  }
+}