You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/21 16:22:45 UTC

[flink-statefun] 05/05: [FLINK-16063][core] Apply back pressure in FunctionGroupOperator

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 61ab8db4b03ed5828f9a85cb0ed5b07c6c4d5ce3
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Thu Feb 20 22:44:53 2020 +0100

    [FLINK-16063][core] Apply back pressure in FunctionGroupOperator
    
    This closes #29.
---
 .../flink/core/functions/FunctionGroupOperator.java         | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 17162cc..713368e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses;
+import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
 import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve;
 import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
 import org.apache.flink.statefun.flink.core.message.Message;
@@ -55,6 +56,7 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
   // -- runtime
   private transient Reductions reductions;
   private transient MailboxExecutor mailboxExecutor;
+  private transient BackPressureValve backPressureValve;
 
   FunctionGroupOperator(
       Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs,
@@ -72,7 +74,10 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
   // ------------------------------------------------------------------------------------------------------------------
 
   @Override
-  public void processElement(StreamRecord<Message> record) {
+  public void processElement(StreamRecord<Message> record) throws InterruptedException {
+    while (backPressureValve.shouldBackPressure()) {
+      mailboxExecutor.yield();
+    }
     reductions.apply(record.getValue());
   }
 
@@ -96,15 +101,15 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
 
     Objects.requireNonNull(mailboxExecutor, "MailboxExecutor is unexpectedly NULL");
 
-    // TODO: once FLINK-16149 would be merged, we should pass the threshold as a configuration.
-    ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(1_000);
+    this.backPressureValve =
+        new ThresholdBackPressureValve(configuration.getMaxAsyncOperationsPerTask());
 
     //
     // the core logic of applying messages to functions.
     //
     this.reductions =
         Reductions.create(
-            thresholdBackPressureValve,
+            backPressureValve,
             statefulFunctionsUniverse,
             getRuntimeContext(),
             getKeyedStateBackend(),