You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/21 16:22:42 UTC
[flink-statefun] 02/05: [FLINK-16063][core] Wire BackPressureValve
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 3bb5c652666eaaa469acab1c66bf4e9b9678c01e
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Thu Feb 20 22:11:49 2020 +0100
[FLINK-16063][core] Wire BackPressureValve
---
.../flink/statefun/flink/core/functions/FunctionGroupOperator.java | 5 +++++
.../org/apache/flink/statefun/flink/core/functions/Reductions.java | 4 ++++
.../apache/flink/statefun/flink/core/functions/ReductionsTest.java | 2 ++
3 files changed, 11 insertions(+)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 13a8dac..17162cc 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses;
+import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve;
import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageFactory;
@@ -95,11 +96,15 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
Objects.requireNonNull(mailboxExecutor, "MailboxExecutor is unexpectedly NULL");
+ // TODO: once FLINK-16149 would be merged, we should pass the threshold as a configuration.
+ ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(1_000);
+
//
// the core logic of applying messages to functions.
//
this.reductions =
Reductions.create(
+ thresholdBackPressureValve,
statefulFunctionsUniverse,
getRuntimeContext(),
getKeyedStateBackend(),
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
index fec2c92..7561541 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
+import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Lazy;
import org.apache.flink.statefun.flink.core.di.ObjectContainer;
@@ -51,6 +52,7 @@ final class Reductions {
}
static Reductions create(
+ BackPressureValve valve,
StatefulFunctionsUniverse statefulFunctionsUniverse,
RuntimeContext context,
KeyedStateBackend<Object> keyedStateBackend,
@@ -116,6 +118,8 @@ final class Reductions {
container.add("async-operations", MapState.class, asyncOperations);
container.add(AsyncSink.class);
+ container.add("backpressure-valve", BackPressureValve.class, valve);
+
return container.get(Reductions.class);
}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
index e279913..78687c3 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.TestUtils;
+import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -88,6 +89,7 @@ public class ReductionsTest {
public void testFactory() {
Reductions reductions =
Reductions.create(
+ new ThresholdBackPressureValve(-1),
new StatefulFunctionsUniverse(MessageFactoryType.WITH_KRYO_PAYLOADS),
new FakeRuntimeContext(),
new FakeKeyedStateBackend(),