You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/21 16:22:45 UTC
[flink-statefun] 05/05: [FLINK-16063][core] Apply back pressure in
FunctionGroupOperator
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 61ab8db4b03ed5828f9a85cb0ed5b07c6c4d5ce3
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Thu Feb 20 22:44:53 2020 +0100
[FLINK-16063][core] Apply back pressure in FunctionGroupOperator
This closes #29.
---
.../flink/core/functions/FunctionGroupOperator.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 17162cc..713368e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses;
+import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve;
import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
import org.apache.flink.statefun.flink.core.message.Message;
@@ -55,6 +56,7 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
// -- runtime
private transient Reductions reductions;
private transient MailboxExecutor mailboxExecutor;
+ private transient BackPressureValve backPressureValve;
FunctionGroupOperator(
Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs,
@@ -72,7 +74,10 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
// ------------------------------------------------------------------------------------------------------------------
@Override
- public void processElement(StreamRecord<Message> record) {
+ public void processElement(StreamRecord<Message> record) throws InterruptedException {
+ while (backPressureValve.shouldBackPressure()) {
+ mailboxExecutor.yield();
+ }
reductions.apply(record.getValue());
}
@@ -96,15 +101,15 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
Objects.requireNonNull(mailboxExecutor, "MailboxExecutor is unexpectedly NULL");
- // TODO: once FLINK-16149 would be merged, we should pass the threshold as a configuration.
- ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(1_000);
+ this.backPressureValve =
+ new ThresholdBackPressureValve(configuration.getMaxAsyncOperationsPerTask());
//
// the core logic of applying messages to functions.
//
this.reductions =
Reductions.create(
- thresholdBackPressureValve,
+ backPressureValve,
statefulFunctionsUniverse,
getRuntimeContext(),
getKeyedStateBackend(),