You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2018/12/29 12:15:42 UTC

[beam] branch master updated: [BEAM-6329] Address synchronization issue for portable timers (#7359)

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ec6384a  [BEAM-6329] Address synchronization issue for portable timers (#7359)
ec6384a is described below

commit ec6384a7d31696c40fd566a956b782f85e017bd1
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sat Dec 29 13:15:33 2018 +0100

    [BEAM-6329] Address synchronization issue for portable timers (#7359)
    
    The current active key needs to be set on Flink's state backend to support deleting pending timers on timer registration or firing. A keyed map for pending timers is necessary because Flink only allows deleting/resetting timers by their original time, not by their ids.
    
    Setting the current active key can interfere with accessing user-defined state, but can also interfere when a timer is set at the same time that a timer fires. In cases with a key initialized by the state requests, this would have caused pending timers to be associated with the wrong key which would have led to problems deleting / resetting them.
---
 .../runners/flink/translation/utils/NoopLock.java  |  72 ++++++++++++
 .../wrappers/streaming/DoFnOperator.java           |   1 -
 .../streaming/ExecutableStageDoFnOperator.java     | 126 +++++++++++++++------
 3 files changed, 163 insertions(+), 36 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/NoopLock.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/NoopLock.java
new file mode 100644
index 0000000..ee65c22
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/NoopLock.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.utils;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import javax.annotation.Nonnull;
+
+/**
+ * A lock which can always be acquired. It should not be used when a proper lock is required, but it
+ * is useful as a performance optimization when locking is not necessary but the code paths have to
+ * be shared between the locking and the non-locking variant.
+ *
+ * <p>For example, in {@link
+ * org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator}, the
+ * locking on the state backend is only required when both timers and state are used.
+ */
+public class NoopLock implements Lock, Serializable {
+
+  private static NoopLock instance;
+
+  public static NoopLock get() {
+    if (instance == null) {
+      instance = new NoopLock();
+    }
+    return instance;
+  }
+
+  private NoopLock() {}
+
+  @Override
+  public void lock() {}
+
+  @Override
+  public void lockInterruptibly() {}
+
+  @Override
+  public boolean tryLock() {
+    return true;
+  }
+
+  @Override
+  public boolean tryLock(long time, @Nonnull TimeUnit unit) {
+    return true;
+  }
+
+  @Override
+  public void unlock() {}
+
+  @Nonnull
+  @Override
+  public Condition newCondition() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 73e0bed..ac16d35 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -947,7 +947,6 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
     @Override
     public void setTimer(TimerData timer) {
       try {
-        getKeyedStateBackend().setCurrentKey(getCurrentKey());
         String contextTimerId = getContextTimerId(timer);
         // Only one timer can exist at a time for a given timer id and context.
         // If a timer gets set twice in the same context, the second must
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 5270a51..af1c606 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -33,6 +33,9 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
@@ -48,6 +51,7 @@ import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
+import org.apache.beam.runners.flink.translation.utils.NoopLock;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
 import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
@@ -100,6 +104,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
   private final Map<String, TupleTag<?>> outputMap;
   private final Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputIds;
   private final boolean usesTimers;
+  /** A lock which has to be acquired when concurrently accessing state and setting timers. */
+  private final Lock stateBackendLock;
 
   private transient FlinkExecutableStageContext stageContext;
   private transient StateRequestHandler stateRequestHandler;
@@ -150,6 +156,18 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
     this.outputMap = outputMap;
     this.sideInputIds = sideInputIds;
     this.usesTimers = payload.getTimersCount() > 0;
+    if (usesTimers) {
+      // We only need to lock if we have timers. 1) Timers can
+      // interfere with state access. 2) Even without state access,
+      // setting timers can interfere with firing timers.
+      this.stateBackendLock = new ReentrantLock();
+    } else {
+      // Plain state access is guaranteed to not interfere with the state
+      // backend. The current key of the state backend is set manually before
+      // accessing the keyed state. Flink's automatic key setting before
+      // processing elements is overridden in this class.
+      this.stateBackendLock = NoopLock.get();
+    }
   }
 
   @Override
@@ -210,7 +228,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
       userStateRequestHandler =
           StateRequestHandlers.forBagUserStateHandlerFactory(
               stageBundleFactory.getProcessBundleDescriptor(),
-              new BagUserStateFactory(keyedStateInternals, getKeyedStateBackend()));
+              new BagUserStateFactory(
+                  keyedStateInternals, getKeyedStateBackend(), stateBackendLock));
     } else {
       userStateRequestHandler = StateRequestHandler.unsupported();
     }
@@ -227,12 +246,16 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
 
     private final StateInternals stateInternals;
     private final KeyedStateBackend<ByteBuffer> keyedStateBackend;
+    private final Lock stateBackendLock;
 
     private BagUserStateFactory(
-        StateInternals stateInternals, KeyedStateBackend<ByteBuffer> keyedStateBackend) {
+        StateInternals stateInternals,
+        KeyedStateBackend<ByteBuffer> keyedStateBackend,
+        Lock stateBackendLock) {
 
       this.stateInternals = stateInternals;
       this.keyedStateBackend = keyedStateBackend;
+      this.stateBackendLock = stateBackendLock;
     }
 
     @Override
@@ -246,31 +269,46 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
       return new StateRequestHandlers.BagUserStateHandler<K, V, W>() {
         @Override
         public Iterable<V> get(K key, W window) {
-          prepareStateBackend(key, keyCoder);
-          StateNamespace namespace = StateNamespaces.window(windowCoder, window);
-          BagState<V> bagState =
-              stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
-          return bagState.read();
+          try {
+            stateBackendLock.lock();
+            prepareStateBackend(key, keyCoder);
+            StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+            BagState<V> bagState =
+                stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
+            return bagState.read();
+          } finally {
+            stateBackendLock.unlock();
+          }
         }
 
         @Override
         public void append(K key, W window, Iterator<V> values) {
-          prepareStateBackend(key, keyCoder);
-          StateNamespace namespace = StateNamespaces.window(windowCoder, window);
-          BagState<V> bagState =
-              stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
-          while (values.hasNext()) {
-            bagState.add(values.next());
+          try {
+            stateBackendLock.lock();
+            prepareStateBackend(key, keyCoder);
+            StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+            BagState<V> bagState =
+                stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
+            while (values.hasNext()) {
+              bagState.add(values.next());
+            }
+          } finally {
+            stateBackendLock.unlock();
           }
         }
 
         @Override
         public void clear(K key, W window) {
-          prepareStateBackend(key, keyCoder);
-          StateNamespace namespace = StateNamespaces.window(windowCoder, window);
-          BagState<V> bagState =
-              stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
-          bagState.clear();
+          try {
+            stateBackendLock.lock();
+            prepareStateBackend(key, keyCoder);
+            StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+            BagState<V> bagState =
+                stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
+            bagState.clear();
+          } finally {
+            stateBackendLock.unlock();
+          }
         }
 
         private void prepareStateBackend(K key, Coder<K> keyCoder) {
@@ -312,6 +350,25 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
     return sdkHarnessRunner.getCurrentTimerKey();
   }
 
+  private void setTimer(WindowedValue<InputT> timerElement, TimerInternals.TimerData timerData) {
+    try {
+      Object key = keySelector.getKey(timerElement);
+      sdkHarnessRunner.setCurrentTimerKey(key);
+      // We have to synchronize to ensure the state backend is not concurrently accessed by the state requests
+      try {
+        stateBackendLock.lock();
+        getKeyedStateBackend().setCurrentKey(key);
+        timerInternals.setTimer(timerData);
+      } finally {
+        stateBackendLock.unlock();
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Couldn't set timer", e);
+    } finally {
+      sdkHarnessRunner.setCurrentTimerKey(null);
+    }
+  }
+
   @Override
   public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
     // We need to decode the key
@@ -328,7 +385,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
     }
     // Prepare the SdkHarnessRunner with the key for the timer
     sdkHarnessRunner.setCurrentTimerKey(decodedKey);
-    super.fireTimer(timer);
+    // We have to synchronize to ensure the state backend is not concurrently accessed by the state requests
+    try {
+      stateBackendLock.lock();
+      getKeyedStateBackend().setCurrentKey(encodedKey);
+      super.fireTimer(timer);
+    } finally {
+      stateBackendLock.unlock();
+    }
   }
 
   @Override
@@ -371,7 +435,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
             outputMap,
             (Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder(),
             keySelector,
-            timerInternals);
+            this::setTimer);
     return sdkHarnessRunner;
   }
 
@@ -444,7 +508,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
 
     private final Coder<BoundedWindow> windowCoder;
     private final KeySelector<WindowedValue<InputT>, ?> keySelector;
-    private final TimerInternals timerInternals;
+    private final BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> timerRegistration;
 
     private RemoteBundle remoteBundle;
     private FnDataReceiver<WindowedValue<?>> mainInputReceiver;
@@ -464,7 +528,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
         Map<String, TupleTag<?>> outputMap,
         Coder<BoundedWindow> windowCoder,
         KeySelector<WindowedValue<InputT>, ?> keySelector,
-        TimerInternals timerInternals) {
+        BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> timerRegistration) {
       this.mainInput = mainInput;
       this.stageBundleFactory = stageBundleFactory;
       this.stateRequestHandler = stateRequestHandler;
@@ -472,7 +536,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
       this.outputManager = outputManager;
       this.outputMap = outputMap;
       this.keySelector = keySelector;
-      this.timerInternals = timerInternals;
+      this.timerRegistration = timerRegistration;
       this.timerOutputIdToSpecMap = new HashMap<>();
       // Gather all timers from all transforms by their output pCollectionId which is unique
       for (Map<String, ProcessBundleDescriptors.TimerSpec> transformTimerMap :
@@ -550,6 +614,9 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
 
     @Override
     public void finishBundle() {
+      if (remoteBundle == null) {
+        return;
+      }
       try {
         // TODO: it would be nice to emit results as they arrive, can thread wait non-blocking?
         // close blocks until all results are received
@@ -617,23 +684,12 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
                     namespace,
                     timer.getTimestamp(),
                     timerSpec.getTimerSpec().getTimeDomain());
-            setTimer(windowedValue, timerData);
+            timerRegistration.accept(windowedValue, timerData);
           }
         }
       }
     }
 
-    private void setTimer(WindowedValue timerElement, TimerInternals.TimerData timerData) {
-      try {
-        currentTimerKey = keySelector.getKey(timerElement);
-        timerInternals.setTimer(timerData);
-      } catch (Exception e) {
-        throw new RuntimeException("Couldn't set timer", e);
-      } finally {
-        currentTimerKey = null;
-      }
-    }
-
     @Override
     public DoFn<InputT, OutputT> getFn() {
       throw new UnsupportedOperationException();