You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 07:55:46 UTC

[flink-statefun] 02/06: [FLINK-16244] Use PendingAsyncOperations in AsyncSink

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit f367b40e9096e89a7afe04c93b067bb7b3f25a1e
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Mon Feb 24 21:59:47 2020 +0100

    [FLINK-16244] Use PendingAsyncOperations in AsyncSink
    
    This closes #33.
---
 .../flink/core/functions/AsyncMessageDecorator.java       | 13 ++++---------
 .../core/functions/AsyncOperationFailureNotifier.java     | 11 +++--------
 .../flink/statefun/flink/core/functions/AsyncSink.java    | 11 +++--------
 .../flink/core/functions/FunctionGroupOperator.java       |  9 ++++++++-
 .../flink/statefun/flink/core/functions/Reductions.java   | 15 ++++++++++++++-
 5 files changed, 32 insertions(+), 27 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
index 8e32ec2..ee6e869 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
@@ -18,7 +18,6 @@
 package org.apache.flink.statefun.flink.core.functions;
 
 import javax.annotation.Nullable;
-import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.message.MessageFactory;
@@ -31,7 +30,7 @@ import org.apache.flink.statefun.sdk.AsyncOperationResult.Status;
  * with an async operation.
  */
 final class AsyncMessageDecorator<T> implements Message {
-  private final MapState<Long, Message> pendingAsyncOperations;
+  private final PendingAsyncOperations pendingAsyncOperations;
   private final long futureId;
   private final Message message;
   private final Throwable throwable;
@@ -39,7 +38,7 @@ final class AsyncMessageDecorator<T> implements Message {
   private final boolean restored;
 
   AsyncMessageDecorator(
-      MapState<Long, Message> pendingAsyncOperations,
+      PendingAsyncOperations pendingAsyncOperations,
       long futureId,
       Message message,
       T result,
@@ -53,7 +52,7 @@ final class AsyncMessageDecorator<T> implements Message {
   }
 
   AsyncMessageDecorator(
-      MapState<Long, Message> asyncOperationState, Long futureId, Message metadataMessage) {
+      PendingAsyncOperations asyncOperationState, Long futureId, Message metadataMessage) {
     this.futureId = futureId;
     this.pendingAsyncOperations = asyncOperationState;
     this.message = metadataMessage;
@@ -94,11 +93,7 @@ final class AsyncMessageDecorator<T> implements Message {
 
   @Override
   public void postApply() {
-    try {
-      pendingAsyncOperations.remove(futureId);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    pendingAsyncOperations.remove(source(), futureId);
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncOperationFailureNotifier.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncOperationFailureNotifier.java
index 6d393c9..89cc23a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncOperationFailureNotifier.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncOperationFailureNotifier.java
@@ -33,12 +33,11 @@ final class AsyncOperationFailureNotifier
   static void fireExpiredAsyncOperations(
       MapStateDescriptor<Long, Message> asyncOperationStateDescriptor,
       Reductions reductions,
-      MapState<Long, Message> asyncOperationState,
       KeyedStateBackend<String> keyedStateBackend)
       throws Exception {
 
     AsyncOperationFailureNotifier asyncOperationFailureNotifier =
-        new AsyncOperationFailureNotifier(reductions, asyncOperationState);
+        new AsyncOperationFailureNotifier(reductions);
 
     keyedStateBackend.applyToAllKeys(
         VoidNamespace.get(),
@@ -52,14 +51,11 @@ final class AsyncOperationFailureNotifier
   }
 
   private final Reductions reductions;
-  private final MapState<Long, Message> asyncOperationState;
 
   private boolean enqueued;
 
-  private AsyncOperationFailureNotifier(
-      Reductions reductions, MapState<Long, Message> asyncOperationState) {
+  private AsyncOperationFailureNotifier(Reductions reductions) {
     this.reductions = Objects.requireNonNull(reductions);
-    this.asyncOperationState = Objects.requireNonNull(asyncOperationState);
   }
 
   @Override
@@ -67,8 +63,7 @@ final class AsyncOperationFailureNotifier
     for (Entry<Long, Message> entry : state.entries()) {
       Long futureId = entry.getKey();
       Message metadataMessage = entry.getValue();
-      Message adaptor = new AsyncMessageDecorator(asyncOperationState, futureId, metadataMessage);
-      reductions.enqueue(adaptor);
+      reductions.enqueueAsyncOperationAfterRestore(futureId, metadataMessage);
       enqueued = true;
     }
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
index aa8497b..873ab34 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
@@ -22,7 +22,6 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadLocalRandom;
-import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
 import org.apache.flink.statefun.flink.core.di.Inject;
 import org.apache.flink.statefun.flink.core.di.Label;
@@ -33,7 +32,7 @@ import org.apache.flink.statefun.flink.core.queue.MpscQueue;
 import org.apache.flink.statefun.sdk.Address;
 
 final class AsyncSink {
-  private final MapState<Long, Message> pendingAsyncOperations;
+  private final PendingAsyncOperations pendingAsyncOperations;
   private final Lazy<Reductions> reductions;
   private final Executor operatorMailbox;
   private final BackPressureValve backPressureValve;
@@ -42,7 +41,7 @@ final class AsyncSink {
 
   @Inject
   AsyncSink(
-      @Label("async-operations") MapState<Long, Message> pendingAsyncOperations,
+      PendingAsyncOperations pendingAsyncOperations,
       @Label("mailbox-executor") Executor operatorMailbox,
       @Label("reductions") Lazy<Reductions> reductions,
       @Label("backpressure-valve") BackPressureValve backPressureValve) {
@@ -60,11 +59,7 @@ final class AsyncSink {
     // 2. after recovery, we clear that state by notifying the owning function that we don't know
     // what happened
     // with that particular async operation.
-    try {
-      pendingAsyncOperations.put(futureId, metadata);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    pendingAsyncOperations.add(metadata.source(), futureId, metadata);
     backPressureValve.notifyAsyncOperationRegistered();
     future.whenComplete((result, throwable) -> enqueue(metadata, futureId, result, throwable));
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 713368e..a5d7335 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
@@ -125,7 +126,13 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
     // expire all the pending async operations.
     //
     AsyncOperationFailureNotifier.fireExpiredAsyncOperations(
-        asyncOperationStateDescriptor, reductions, asyncOperationState, getKeyedStateBackend());
+        asyncOperationStateDescriptor, reductions, getKeyedStateBackend());
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+    reductions.snapshotAsyncOperations();
   }
 
   // ------------------------------------------------------------------------------------------------------------------
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
index 7561541..27f7b10 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
@@ -45,10 +45,12 @@ import org.apache.flink.util.OutputTag;
 
 final class Reductions {
   private final LocalFunctionGroup localFunctionGroup;
+  private final PendingAsyncOperations pendingAsyncOperations;
 
   @Inject
-  Reductions(LocalFunctionGroup functionGroup) {
+  Reductions(PendingAsyncOperations pendingAsyncOperations, LocalFunctionGroup functionGroup) {
     this.localFunctionGroup = Objects.requireNonNull(functionGroup);
+    this.pendingAsyncOperations = Objects.requireNonNull(pendingAsyncOperations);
   }
 
   static Reductions create(
@@ -117,6 +119,7 @@ final class Reductions {
     // for the async operations
     container.add("async-operations", MapState.class, asyncOperations);
     container.add(AsyncSink.class);
+    container.add(PendingAsyncOperations.class);
 
     container.add("backpressure-valve", BackPressureValve.class, valve);
 
@@ -132,10 +135,20 @@ final class Reductions {
     localFunctionGroup.enqueue(message);
   }
 
+  void enqueueAsyncOperationAfterRestore(Long futureId, Message metadataMessage) {
+    Message adaptor =
+        new AsyncMessageDecorator<>(pendingAsyncOperations, futureId, metadataMessage);
+    enqueue(adaptor);
+  }
+
   @SuppressWarnings("StatementWithEmptyBody")
   void processEnvelopes() {
     while (localFunctionGroup.processNextEnvelope()) {
       // TODO: consider preemption if too many local messages.
     }
   }
+
+  void snapshotAsyncOperations() {
+    pendingAsyncOperations.flush();
+  }
 }