You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/21 16:22:42 UTC

[flink-statefun] 02/05: [FLINK-16063][core] Wire BackPressureValve

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 3bb5c652666eaaa469acab1c66bf4e9b9678c01e
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Thu Feb 20 22:11:49 2020 +0100

    [FLINK-16063][core] Wire BackPressureValve
---
 .../flink/statefun/flink/core/functions/FunctionGroupOperator.java   | 5 +++++
 .../org/apache/flink/statefun/flink/core/functions/Reductions.java   | 4 ++++
 .../apache/flink/statefun/flink/core/functions/ReductionsTest.java   | 2 ++
 3 files changed, 11 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 13a8dac..17162cc 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses;
+import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve;
 import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.message.MessageFactory;
@@ -95,11 +96,15 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
 
     Objects.requireNonNull(mailboxExecutor, "MailboxExecutor is unexpectedly NULL");
 
+    // TODO: once FLINK-16149 would be merged, we should pass the threshold as a configuration.
+    ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(1_000);
+
     //
     // the core logic of applying messages to functions.
     //
     this.reductions =
         Reductions.create(
+            thresholdBackPressureValve,
             statefulFunctionsUniverse,
             getRuntimeContext(),
             getKeyedStateBackend(),
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
index fec2c92..7561541 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
+import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
 import org.apache.flink.statefun.flink.core.di.Inject;
 import org.apache.flink.statefun.flink.core.di.Lazy;
 import org.apache.flink.statefun.flink.core.di.ObjectContainer;
@@ -51,6 +52,7 @@ final class Reductions {
   }
 
   static Reductions create(
+      BackPressureValve valve,
       StatefulFunctionsUniverse statefulFunctionsUniverse,
       RuntimeContext context,
       KeyedStateBackend<Object> keyedStateBackend,
@@ -116,6 +118,8 @@ final class Reductions {
     container.add("async-operations", MapState.class, asyncOperations);
     container.add(AsyncSink.class);
 
+    container.add("backpressure-valve", BackPressureValve.class, valve);
+
     return container.get(Reductions.class);
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
index e279913..78687c3 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.TestUtils;
+import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -88,6 +89,7 @@ public class ReductionsTest {
   public void testFactory() {
     Reductions reductions =
         Reductions.create(
+            new ThresholdBackPressureValve(-1),
             new StatefulFunctionsUniverse(MessageFactoryType.WITH_KRYO_PAYLOADS),
             new FakeRuntimeContext(),
             new FakeKeyedStateBackend(),