You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 07:55:46 UTC
[flink-statefun] 02/06: [FLINK-16244] Use PendingAsyncOperations in
AsyncSink
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit f367b40e9096e89a7afe04c93b067bb7b3f25a1e
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Mon Feb 24 21:59:47 2020 +0100
[FLINK-16244] Use PendingAsyncOperations in AsyncSink
This closes #33.
---
.../flink/core/functions/AsyncMessageDecorator.java | 13 ++++---------
.../core/functions/AsyncOperationFailureNotifier.java | 11 +++--------
.../flink/statefun/flink/core/functions/AsyncSink.java | 11 +++--------
.../flink/core/functions/FunctionGroupOperator.java | 9 ++++++++-
.../flink/statefun/flink/core/functions/Reductions.java | 15 ++++++++++++++-
5 files changed, 32 insertions(+), 27 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
index 8e32ec2..ee6e869 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
@@ -18,7 +18,6 @@
package org.apache.flink.statefun.flink.core.functions;
import javax.annotation.Nullable;
-import org.apache.flink.api.common.state.MapState;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageFactory;
@@ -31,7 +30,7 @@ import org.apache.flink.statefun.sdk.AsyncOperationResult.Status;
* with an async operation.
*/
final class AsyncMessageDecorator<T> implements Message {
- private final MapState<Long, Message> pendingAsyncOperations;
+ private final PendingAsyncOperations pendingAsyncOperations;
private final long futureId;
private final Message message;
private final Throwable throwable;
@@ -39,7 +38,7 @@ final class AsyncMessageDecorator<T> implements Message {
private final boolean restored;
AsyncMessageDecorator(
- MapState<Long, Message> pendingAsyncOperations,
+ PendingAsyncOperations pendingAsyncOperations,
long futureId,
Message message,
T result,
@@ -53,7 +52,7 @@ final class AsyncMessageDecorator<T> implements Message {
}
AsyncMessageDecorator(
- MapState<Long, Message> asyncOperationState, Long futureId, Message metadataMessage) {
+ PendingAsyncOperations asyncOperationState, Long futureId, Message metadataMessage) {
this.futureId = futureId;
this.pendingAsyncOperations = asyncOperationState;
this.message = metadataMessage;
@@ -94,11 +93,7 @@ final class AsyncMessageDecorator<T> implements Message {
@Override
public void postApply() {
- try {
- pendingAsyncOperations.remove(futureId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ pendingAsyncOperations.remove(source(), futureId);
}
@Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncOperationFailureNotifier.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncOperationFailureNotifier.java
index 6d393c9..89cc23a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncOperationFailureNotifier.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncOperationFailureNotifier.java
@@ -33,12 +33,11 @@ final class AsyncOperationFailureNotifier
static void fireExpiredAsyncOperations(
MapStateDescriptor<Long, Message> asyncOperationStateDescriptor,
Reductions reductions,
- MapState<Long, Message> asyncOperationState,
KeyedStateBackend<String> keyedStateBackend)
throws Exception {
AsyncOperationFailureNotifier asyncOperationFailureNotifier =
- new AsyncOperationFailureNotifier(reductions, asyncOperationState);
+ new AsyncOperationFailureNotifier(reductions);
keyedStateBackend.applyToAllKeys(
VoidNamespace.get(),
@@ -52,14 +51,11 @@ final class AsyncOperationFailureNotifier
}
private final Reductions reductions;
- private final MapState<Long, Message> asyncOperationState;
private boolean enqueued;
- private AsyncOperationFailureNotifier(
- Reductions reductions, MapState<Long, Message> asyncOperationState) {
+ private AsyncOperationFailureNotifier(Reductions reductions) {
this.reductions = Objects.requireNonNull(reductions);
- this.asyncOperationState = Objects.requireNonNull(asyncOperationState);
}
@Override
@@ -67,8 +63,7 @@ final class AsyncOperationFailureNotifier
for (Entry<Long, Message> entry : state.entries()) {
Long futureId = entry.getKey();
Message metadataMessage = entry.getValue();
- Message adaptor = new AsyncMessageDecorator(asyncOperationState, futureId, metadataMessage);
- reductions.enqueue(adaptor);
+ reductions.enqueueAsyncOperationAfterRestore(futureId, metadataMessage);
enqueued = true;
}
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
index aa8497b..873ab34 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
@@ -22,7 +22,6 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
-import org.apache.flink.api.common.state.MapState;
import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
@@ -33,7 +32,7 @@ import org.apache.flink.statefun.flink.core.queue.MpscQueue;
import org.apache.flink.statefun.sdk.Address;
final class AsyncSink {
- private final MapState<Long, Message> pendingAsyncOperations;
+ private final PendingAsyncOperations pendingAsyncOperations;
private final Lazy<Reductions> reductions;
private final Executor operatorMailbox;
private final BackPressureValve backPressureValve;
@@ -42,7 +41,7 @@ final class AsyncSink {
@Inject
AsyncSink(
- @Label("async-operations") MapState<Long, Message> pendingAsyncOperations,
+ PendingAsyncOperations pendingAsyncOperations,
@Label("mailbox-executor") Executor operatorMailbox,
@Label("reductions") Lazy<Reductions> reductions,
@Label("backpressure-valve") BackPressureValve backPressureValve) {
@@ -60,11 +59,7 @@ final class AsyncSink {
// 2. after recovery, we clear that state by notifying the owning function that we don't know
// what happened
// with that particular async operation.
- try {
- pendingAsyncOperations.put(futureId, metadata);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ pendingAsyncOperations.add(metadata.source(), futureId, metadata);
backPressureValve.notifyAsyncOperationRegistered();
future.whenComplete((result, throwable) -> enqueue(metadata, futureId, result, throwable));
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 713368e..a5d7335 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
@@ -125,7 +126,13 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
// expire all the pending async operations.
//
AsyncOperationFailureNotifier.fireExpiredAsyncOperations(
- asyncOperationStateDescriptor, reductions, asyncOperationState, getKeyedStateBackend());
+ asyncOperationStateDescriptor, reductions, getKeyedStateBackend());
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ reductions.snapshotAsyncOperations();
}
// ------------------------------------------------------------------------------------------------------------------
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
index 7561541..27f7b10 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
@@ -45,10 +45,12 @@ import org.apache.flink.util.OutputTag;
final class Reductions {
private final LocalFunctionGroup localFunctionGroup;
+ private final PendingAsyncOperations pendingAsyncOperations;
@Inject
- Reductions(LocalFunctionGroup functionGroup) {
+ Reductions(PendingAsyncOperations pendingAsyncOperations, LocalFunctionGroup functionGroup) {
this.localFunctionGroup = Objects.requireNonNull(functionGroup);
+ this.pendingAsyncOperations = Objects.requireNonNull(pendingAsyncOperations);
}
static Reductions create(
@@ -117,6 +119,7 @@ final class Reductions {
// for the async operations
container.add("async-operations", MapState.class, asyncOperations);
container.add(AsyncSink.class);
+ container.add(PendingAsyncOperations.class);
container.add("backpressure-valve", BackPressureValve.class, valve);
@@ -132,10 +135,20 @@ final class Reductions {
localFunctionGroup.enqueue(message);
}
+ void enqueueAsyncOperationAfterRestore(Long futureId, Message metadataMessage) {
+ Message adaptor =
+ new AsyncMessageDecorator<>(pendingAsyncOperations, futureId, metadataMessage);
+ enqueue(adaptor);
+ }
+
@SuppressWarnings("StatementWithEmptyBody")
void processEnvelopes() {
while (localFunctionGroup.processNextEnvelope()) {
// TODO: consider preemption if too many local messages.
}
}
+
+ void snapshotAsyncOperations() {
+ pendingAsyncOperations.flush();
+ }
}