You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2018/12/29 12:15:42 UTC
[beam] branch master updated: [BEAM-6329] Address synchronization
issue for portable timers (#7359)
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ec6384a [BEAM-6329] Address synchronization issue for portable timers (#7359)
ec6384a is described below
commit ec6384a7d31696c40fd566a956b782f85e017bd1
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sat Dec 29 13:15:33 2018 +0100
[BEAM-6329] Address synchronization issue for portable timers (#7359)
The current active key needs to be set on Flink's state backend to support deleting pending timers on timer registration or firing. A keyed map for pending timers is necessary because Flink only allows deleting/resetting timers by their original time, not by their ids.
Setting the current active key can interfere with accessing user-defined state, but can also interfere when a timer is set at the same time that a timer fires. In cases with a key initialized by the state requests, this would have caused pending timers to be associated with the wrong key which would have led to problems deleting / resetting them.
---
.../runners/flink/translation/utils/NoopLock.java | 72 ++++++++++++
.../wrappers/streaming/DoFnOperator.java | 1 -
.../streaming/ExecutableStageDoFnOperator.java | 126 +++++++++++++++------
3 files changed, 163 insertions(+), 36 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/NoopLock.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/NoopLock.java
new file mode 100644
index 0000000..ee65c22
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/NoopLock.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.utils;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import javax.annotation.Nonnull;
+
+/**
+ * A lock which can always be acquired. It should not be used when a proper lock is required, but it
+ * is useful as a performance optimization when locking is not necessary but the code paths have to
+ * be shared between the locking and the non-locking variant.
+ *
+ * <p>For example, in {@link
+ * org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator}, the
+ * locking on the state backend is only required when both timers and state are used.
+ */
+public class NoopLock implements Lock, Serializable {
+
+ private static NoopLock instance;
+
+ public static NoopLock get() {
+ if (instance == null) {
+ instance = new NoopLock();
+ }
+ return instance;
+ }
+
+ private NoopLock() {}
+
+ @Override
+ public void lock() {}
+
+ @Override
+ public void lockInterruptibly() {}
+
+ @Override
+ public boolean tryLock() {
+ return true;
+ }
+
+ @Override
+ public boolean tryLock(long time, @Nonnull TimeUnit unit) {
+ return true;
+ }
+
+ @Override
+ public void unlock() {}
+
+ @Nonnull
+ @Override
+ public Condition newCondition() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 73e0bed..ac16d35 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -947,7 +947,6 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
@Override
public void setTimer(TimerData timer) {
try {
- getKeyedStateBackend().setCurrentKey(getCurrentKey());
String contextTimerId = getContextTimerId(timer);
// Only one timer can exist at a time for a given timer id and context.
// If a timer gets set twice in the same context, the second must
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 5270a51..af1c606 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -33,6 +33,9 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
@@ -48,6 +51,7 @@ import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
+import org.apache.beam.runners.flink.translation.utils.NoopLock;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
@@ -100,6 +104,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
private final Map<String, TupleTag<?>> outputMap;
private final Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputIds;
private final boolean usesTimers;
+ /** A lock which has to be acquired when concurrently accessing state and setting timers. */
+ private final Lock stateBackendLock;
private transient FlinkExecutableStageContext stageContext;
private transient StateRequestHandler stateRequestHandler;
@@ -150,6 +156,18 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
this.outputMap = outputMap;
this.sideInputIds = sideInputIds;
this.usesTimers = payload.getTimersCount() > 0;
+ if (usesTimers) {
+ // We only need to lock if we have timers. 1) Timers can
+ // interfere with state access. 2) Even without state access,
+ // setting timers can interfere with firing timers.
+ this.stateBackendLock = new ReentrantLock();
+ } else {
+ // Plain state access is guaranteed to not interfere with the state
+ // backend. The current key of the state backend is set manually before
+ // accessing the keyed state. Flink's automatic key setting before
+ // processing elements is overridden in this class.
+ this.stateBackendLock = NoopLock.get();
+ }
}
@Override
@@ -210,7 +228,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
userStateRequestHandler =
StateRequestHandlers.forBagUserStateHandlerFactory(
stageBundleFactory.getProcessBundleDescriptor(),
- new BagUserStateFactory(keyedStateInternals, getKeyedStateBackend()));
+ new BagUserStateFactory(
+ keyedStateInternals, getKeyedStateBackend(), stateBackendLock));
} else {
userStateRequestHandler = StateRequestHandler.unsupported();
}
@@ -227,12 +246,16 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
private final StateInternals stateInternals;
private final KeyedStateBackend<ByteBuffer> keyedStateBackend;
+ private final Lock stateBackendLock;
private BagUserStateFactory(
- StateInternals stateInternals, KeyedStateBackend<ByteBuffer> keyedStateBackend) {
+ StateInternals stateInternals,
+ KeyedStateBackend<ByteBuffer> keyedStateBackend,
+ Lock stateBackendLock) {
this.stateInternals = stateInternals;
this.keyedStateBackend = keyedStateBackend;
+ this.stateBackendLock = stateBackendLock;
}
@Override
@@ -246,31 +269,46 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
return new StateRequestHandlers.BagUserStateHandler<K, V, W>() {
@Override
public Iterable<V> get(K key, W window) {
- prepareStateBackend(key, keyCoder);
- StateNamespace namespace = StateNamespaces.window(windowCoder, window);
- BagState<V> bagState =
- stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
- return bagState.read();
+ try {
+ stateBackendLock.lock();
+ prepareStateBackend(key, keyCoder);
+ StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+ BagState<V> bagState =
+ stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
+ return bagState.read();
+ } finally {
+ stateBackendLock.unlock();
+ }
}
@Override
public void append(K key, W window, Iterator<V> values) {
- prepareStateBackend(key, keyCoder);
- StateNamespace namespace = StateNamespaces.window(windowCoder, window);
- BagState<V> bagState =
- stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
- while (values.hasNext()) {
- bagState.add(values.next());
+ try {
+ stateBackendLock.lock();
+ prepareStateBackend(key, keyCoder);
+ StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+ BagState<V> bagState =
+ stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
+ while (values.hasNext()) {
+ bagState.add(values.next());
+ }
+ } finally {
+ stateBackendLock.unlock();
}
}
@Override
public void clear(K key, W window) {
- prepareStateBackend(key, keyCoder);
- StateNamespace namespace = StateNamespaces.window(windowCoder, window);
- BagState<V> bagState =
- stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
- bagState.clear();
+ try {
+ stateBackendLock.lock();
+ prepareStateBackend(key, keyCoder);
+ StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+ BagState<V> bagState =
+ stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
+ bagState.clear();
+ } finally {
+ stateBackendLock.unlock();
+ }
}
private void prepareStateBackend(K key, Coder<K> keyCoder) {
@@ -312,6 +350,25 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
return sdkHarnessRunner.getCurrentTimerKey();
}
+ private void setTimer(WindowedValue<InputT> timerElement, TimerInternals.TimerData timerData) {
+ try {
+ Object key = keySelector.getKey(timerElement);
+ sdkHarnessRunner.setCurrentTimerKey(key);
+ // We have to synchronize to ensure the state backend is not concurrently accessed by the state requests
+ try {
+ stateBackendLock.lock();
+ getKeyedStateBackend().setCurrentKey(key);
+ timerInternals.setTimer(timerData);
+ } finally {
+ stateBackendLock.unlock();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't set timer", e);
+ } finally {
+ sdkHarnessRunner.setCurrentTimerKey(null);
+ }
+ }
+
@Override
public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
// We need to decode the key
@@ -328,7 +385,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
}
// Prepare the SdkHarnessRunner with the key for the timer
sdkHarnessRunner.setCurrentTimerKey(decodedKey);
- super.fireTimer(timer);
+ // We have to synchronize to ensure the state backend is not concurrently accessed by the state requests
+ try {
+ stateBackendLock.lock();
+ getKeyedStateBackend().setCurrentKey(encodedKey);
+ super.fireTimer(timer);
+ } finally {
+ stateBackendLock.unlock();
+ }
}
@Override
@@ -371,7 +435,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
outputMap,
(Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder(),
keySelector,
- timerInternals);
+ this::setTimer);
return sdkHarnessRunner;
}
@@ -444,7 +508,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
private final Coder<BoundedWindow> windowCoder;
private final KeySelector<WindowedValue<InputT>, ?> keySelector;
- private final TimerInternals timerInternals;
+ private final BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> timerRegistration;
private RemoteBundle remoteBundle;
private FnDataReceiver<WindowedValue<?>> mainInputReceiver;
@@ -464,7 +528,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
Map<String, TupleTag<?>> outputMap,
Coder<BoundedWindow> windowCoder,
KeySelector<WindowedValue<InputT>, ?> keySelector,
- TimerInternals timerInternals) {
+ BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> timerRegistration) {
this.mainInput = mainInput;
this.stageBundleFactory = stageBundleFactory;
this.stateRequestHandler = stateRequestHandler;
@@ -472,7 +536,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
this.outputManager = outputManager;
this.outputMap = outputMap;
this.keySelector = keySelector;
- this.timerInternals = timerInternals;
+ this.timerRegistration = timerRegistration;
this.timerOutputIdToSpecMap = new HashMap<>();
// Gather all timers from all transforms by their output pCollectionId which is unique
for (Map<String, ProcessBundleDescriptors.TimerSpec> transformTimerMap :
@@ -550,6 +614,9 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
@Override
public void finishBundle() {
+ if (remoteBundle == null) {
+ return;
+ }
try {
// TODO: it would be nice to emit results as they arrive, can thread wait non-blocking?
// close blocks until all results are received
@@ -617,23 +684,12 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
namespace,
timer.getTimestamp(),
timerSpec.getTimerSpec().getTimeDomain());
- setTimer(windowedValue, timerData);
+ timerRegistration.accept(windowedValue, timerData);
}
}
}
}
- private void setTimer(WindowedValue timerElement, TimerInternals.TimerData timerData) {
- try {
- currentTimerKey = keySelector.getKey(timerElement);
- timerInternals.setTimer(timerData);
- } catch (Exception e) {
- throw new RuntimeException("Couldn't set timer", e);
- } finally {
- currentTimerKey = null;
- }
- }
-
@Override
public DoFn<InputT, OutputT> getFn() {
throw new UnsupportedOperationException();