You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2018/08/22 00:44:15 UTC
[beam] branch master updated: [BEAM-5110] Explicitly count the references for BatchFlinkExecutableStageContext … (#6189)
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7c41e0a [BEAM-5110] Explicitly count the references for BatchFlinkExecutableStageContext … (#6189)
7c41e0a is described below
commit 7c41e0a915083bd3b1fe52c2a417fa38a00e6463
Author: Ankur <an...@users.noreply.github.com>
AuthorDate: Tue Aug 21 17:44:09 2018 -0700
[BEAM-5110] Explicitly count the references for BatchFlinkExecutableStageContext … (#6189)
* Explicitly count the references for BatchFlinkExecutableStageContext and clean them after TTL
* Separating out ReferenceCountingFlinkExecutableStageContextFactory
* Making ReferenceCountingFlinkExecutableStageFactory serializable
---
.../FlinkBatchExecutableStageContext.java | 29 +--
.../functions/FlinkExecutableStageContext.java | 6 +-
.../functions/FlinkExecutableStageFunction.java | 4 +-
...CountingFlinkExecutableStageContextFactory.java | 229 +++++++++++++++++++++
.../streaming/ExecutableStageDoFnOperator.java | 1 +
...tingFlinkExecutableStageContextFactoryTest.java | 80 +++++++
6 files changed, 323 insertions(+), 26 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
index 9f7c171..5ba04c6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
@@ -17,9 +17,6 @@
*/
package org.apache.beam.runners.flink.translation.functions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import java.io.IOException;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory;
@@ -35,7 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Implementation of a {@link FlinkExecutableStageContext} for batch jobs. */
-class FlinkBatchExecutableStageContext implements FlinkExecutableStageContext {
+class FlinkBatchExecutableStageContext implements FlinkExecutableStageContext, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchExecutableStageContext.class);
private final JobBundleFactory jobBundleFactory;
@@ -68,32 +65,20 @@ class FlinkBatchExecutableStageContext implements FlinkExecutableStageContext {
}
@Override
- protected void finalize() throws Exception {
+ public void close() throws Exception {
jobBundleFactory.close();
}
enum BatchFactory implements Factory {
- INSTANCE;
+ REFERENCE_COUNTING;
- @SuppressWarnings("Immutable") // observably immutable
- private final LoadingCache<JobInfo, FlinkBatchExecutableStageContext> cachedContexts;
-
- BatchFactory() {
- cachedContexts =
- CacheBuilder.newBuilder()
- .weakValues()
- .build(
- new CacheLoader<JobInfo, FlinkBatchExecutableStageContext>() {
- @Override
- public FlinkBatchExecutableStageContext load(JobInfo jobInfo) throws Exception {
- return create(jobInfo);
- }
- });
- }
+ private static final ReferenceCountingFlinkExecutableStageContextFactory actualFactory =
+ ReferenceCountingFlinkExecutableStageContextFactory.create(
+ FlinkBatchExecutableStageContext::create);
@Override
public FlinkExecutableStageContext get(JobInfo jobInfo) {
- return cachedContexts.getUnchecked(jobInfo);
+ return actualFactory.get(jobInfo);
}
}
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
index a1e516c..ba59b53 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
@@ -25,18 +25,20 @@ import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.flink.api.common.functions.RuntimeContext;
/** The Flink context required in order to execute {@link ExecutableStage stages}. */
-public interface FlinkExecutableStageContext {
+public interface FlinkExecutableStageContext extends AutoCloseable {
/**
* Creates {@link FlinkExecutableStageContext} instances. Serializable so that factories can be
* defined at translation time and distributed to TaskManagers.
*/
interface Factory extends Serializable {
+
+ /** Get or create {@link FlinkExecutableStageContext} for given {@link JobInfo}. */
FlinkExecutableStageContext get(JobInfo jobInfo);
}
static Factory batchFactory() {
- return FlinkBatchExecutableStageContext.BatchFactory.INSTANCE;
+ return FlinkBatchExecutableStageContext.BatchFactory.REFERENCE_COUNTING;
}
StageBundleFactory getStageBundleFactory(ExecutableStage executableStage);
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index 3da58e9..c501274 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -65,8 +65,8 @@ public class FlinkExecutableStageFunction<InputT>
// Worker-local fields. These should only be constructed and consumed on Flink TaskManagers.
private transient RuntimeContext runtimeContext;
- private transient FlinkExecutableStageContext stageContext;
private transient StateRequestHandler stateRequestHandler;
+ private transient FlinkExecutableStageContext stageContext;
private transient StageBundleFactory stageBundleFactory;
private transient BundleProgressHandler progressHandler;
@@ -127,7 +127,7 @@ public class FlinkExecutableStageFunction<InputT>
@Override
public void close() throws Exception {
try (AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
- // Remove the reference to stageContext and make stageContext available for garbage collection.
+ try (AutoCloseable closable = stageContext) {}
stageContext = null;
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
new file mode 100644
index 0000000..5121038
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link FlinkExecutableStageContext.Factory} which counts FlinkExecutableStageContext reference
+ * for book keeping.
+ */
+public class ReferenceCountingFlinkExecutableStageContextFactory
+ implements FlinkExecutableStageContext.Factory {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReferenceCountingFlinkExecutableStageContextFactory.class);
+ private static final int TTL_IN_SECONDS = 30;
+ private static final int MAX_RETRY = 3;
+
+ private final Creator creator;
+ private transient volatile ScheduledExecutorService executor;
+ private transient volatile ConcurrentHashMap<String, WrappedContext> keyRegistry;
+
+ public static ReferenceCountingFlinkExecutableStageContextFactory create(Creator creator) {
+ return new ReferenceCountingFlinkExecutableStageContextFactory(creator);
+ }
+
+ private ReferenceCountingFlinkExecutableStageContextFactory(Creator creator) {
+ this.creator = creator;
+ }
+
+ @Override
+ public FlinkExecutableStageContext get(JobInfo jobInfo) {
+ // Retry is needed in case where an existing wrapper is picked from the cache but by
+ // the time we accessed wrapper.referenceCount, the wrapper was tombstoned by a pending
+ // release task.
+ // This race condition is highly unlikely to happen as there is no systematic coding
+ // practice which can cause this error because of TTL. However, even in very unlikely case
+ // when it happen we have the retry which get a valid context.
+ // Note: There is no leak in this logic as the cleanup is only done in release.
+ // In case of usage error where release is called before corresponding get finishes,
+ // release might throw an error. If release did not throw an error than we can be sure that
+ // the state of the system remains valid and appropriate cleanup will be done at TTL.
+ for (int retry = 0; retry < MAX_RETRY; retry++) {
+ // ConcurrentHashMap will handle the thread safety at the creation time.
+ WrappedContext wrapper =
+ getCache()
+ .computeIfAbsent(
+ jobInfo.jobId(),
+ jobId -> {
+ try {
+ return new WrappedContext(jobInfo, creator.apply(jobInfo));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unable to create context for job " + jobInfo.jobId(), e);
+ }
+ });
+ // Take a lock on wrapper before modifying reference count.
+ // Use null referenceCount == null as a tombstone for the wrapper.
+ synchronized (wrapper) {
+ if (wrapper.referenceCount != null) {
+ // The wrapper is still valid.
+ // Release has not yet got the lock and has not yet removed the wrapper.
+ wrapper.referenceCount.incrementAndGet();
+ return wrapper;
+ }
+ }
+ }
+
+ throw new RuntimeException(
+ String.format(
+ "Max retry %s exhausted while creating Context for job %s",
+ MAX_RETRY, jobInfo.jobId()));
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void scheduleRelease(JobInfo jobInfo) {
+ WrappedContext wrapper = getCache().get(jobInfo.jobId());
+ Preconditions.checkState(
+ wrapper != null, "Releasing context for unknown job: " + jobInfo.jobId());
+ // Schedule task to clean the container later.
+ getExecutor().schedule(() -> release(wrapper), TTL_IN_SECONDS, TimeUnit.SECONDS);
+ }
+
+ private ConcurrentHashMap<String, WrappedContext> getCache() {
+ // Lazily initialize keyRegistry because serialization will set it to null.
+ if (keyRegistry != null) {
+ return keyRegistry;
+ }
+ synchronized (this) {
+ if (keyRegistry == null) {
+ keyRegistry = new ConcurrentHashMap<>();
+ }
+ return keyRegistry;
+ }
+ }
+
+ private ScheduledExecutorService getExecutor() {
+ // Lazily initialize executor because serialization will set it to null.
+ if (executor != null) {
+ return executor;
+ }
+ synchronized (this) {
+ if (executor == null) {
+ executor =
+ Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
+ }
+ return executor;
+ }
+ }
+
+ @VisibleForTesting
+ void release(FlinkExecutableStageContext context) {
+ @SuppressWarnings({"unchecked", "Not exected to be called from outside."})
+ WrappedContext wrapper = (WrappedContext) context;
+ synchronized (wrapper) {
+ if (wrapper.referenceCount.decrementAndGet() == 0) {
+ // Tombstone wrapper.
+ wrapper.referenceCount = null;
+ if (getCache().remove(wrapper.jobInfo.jobId(), wrapper)) {
+ try {
+ wrapper.closeActual();
+ } catch (Exception e) {
+ LOG.error("Unable to close.", e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * {@link WrappedContext} does not expose equals of actual {@link FlinkExecutableStageContext}.
+ */
+ private class WrappedContext implements FlinkExecutableStageContext {
+ private JobInfo jobInfo;
+ private AtomicInteger referenceCount;
+ private FlinkExecutableStageContext context;
+
+ /** {@link WrappedContext#equals(Object)} is only based on {@link JobInfo#jobId()}. */
+ WrappedContext(JobInfo jobInfo, FlinkExecutableStageContext context) {
+ this.jobInfo = jobInfo;
+ this.context = context;
+ this.referenceCount = new AtomicInteger(0);
+ }
+
+ @Override
+ public StageBundleFactory getStageBundleFactory(ExecutableStage executableStage) {
+ return context.getStageBundleFactory(executableStage);
+ }
+
+ @Override
+ public StateRequestHandler getStateRequestHandler(
+ ExecutableStage executableStage, RuntimeContext runtimeContext) {
+ return context.getStateRequestHandler(executableStage, runtimeContext);
+ }
+
+ @Override
+ public void close() {
+ // Just schedule the context as we want to reuse it if possible.
+ scheduleRelease(jobInfo);
+ }
+
+ private void closeActual() throws Exception {
+ context.close();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WrappedContext that = (WrappedContext) o;
+ return Objects.equals(jobInfo.jobId(), that.jobInfo.jobId());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobInfo);
+ }
+
+ @Override
+ public String toString() {
+ return "ContextWrapper{"
+ + "jobId='"
+ + jobInfo
+ + '\''
+ + ", referenceCount="
+ + referenceCount
+ + '}';
+ }
+ }
+
+ /** Interface for creator which extends Serializable. */
+ public interface Creator
+ extends ThrowingFunction<JobInfo, FlinkExecutableStageContext>, Serializable {}
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 2c042c7..d63a3cb 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -150,6 +150,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
public void close() throws Exception {
try (AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
// Remove the reference to stageContext and make stageContext available for garbage collection.
+ try (AutoCloseable closable = stageContext) {}
stageContext = null;
super.close();
}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
new file mode 100644
index 0000000..cf75864
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.Creator;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ReferenceCountingFlinkExecutableStageContextFactory}. */
+@RunWith(JUnit4.class)
+public class ReferenceCountingFlinkExecutableStageContextFactoryTest {
+
+ @Test
+ public void testCreateReuseReleaseCreate() throws Exception {
+
+ Creator creator = mock(Creator.class);
+ FlinkExecutableStageContext c1 = mock(FlinkExecutableStageContext.class);
+ FlinkExecutableStageContext c2 = mock(FlinkExecutableStageContext.class);
+ FlinkExecutableStageContext c3 = mock(FlinkExecutableStageContext.class);
+ FlinkExecutableStageContext c4 = mock(FlinkExecutableStageContext.class);
+ when(creator.apply(any(JobInfo.class)))
+ .thenReturn(c1)
+ .thenReturn(c2)
+ .thenReturn(c3)
+ .thenReturn(c4);
+ ReferenceCountingFlinkExecutableStageContextFactory factory =
+ ReferenceCountingFlinkExecutableStageContextFactory.create(creator);
+ JobInfo jobA = mock(JobInfo.class);
+ when(jobA.jobId()).thenReturn("jobA");
+ JobInfo jobB = mock(JobInfo.class);
+ when(jobB.jobId()).thenReturn("jobB");
+ FlinkExecutableStageContext ac1A = factory.get(jobA); // 1 open jobA
+ FlinkExecutableStageContext ac2B = factory.get(jobB); // 1 open jobB
+ Assert.assertSame(
+ "Context should be cached and reused.", ac1A, factory.get(jobA)); // 2 open jobA
+ Assert.assertSame(
+ "Context should be cached and reused.", ac2B, factory.get(jobB)); // 2 open jobB
+ factory.release(ac1A); // 1 open jobA
+ Assert.assertSame(
+ "Context should be cached and reused.", ac1A, factory.get(jobA)); // 2 open jobA
+ factory.release(ac1A); // 1 open jobA
+ factory.release(ac1A); // 0 open jobA
+ FlinkExecutableStageContext ac3A = factory.get(jobA); // 1 open jobA
+ Assert.assertNotSame("We should get a new instance.", ac1A, ac3A);
+ Assert.assertSame(
+ "Context should be cached and reused.", ac3A, factory.get(jobA)); // 2 open jobA
+ factory.release(ac3A); // 1 open jobA
+ factory.release(ac3A); // 0 open jobA
+ Assert.assertSame(
+ "Context should be cached and reused.", ac2B, factory.get(jobB)); // 3 open jobB
+ factory.release(ac2B); // 2 open jobB
+ factory.release(ac2B); // 1 open jobB
+ factory.release(ac2B); // 0 open jobB
+ FlinkExecutableStageContext ac4B = factory.get(jobB); // 1 open jobB
+ Assert.assertNotSame("We should get a new instance.", ac2B, ac4B);
+ factory.release(ac4B); // 0 open jobB
+ }
+}