You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/21 16:22:40 UTC

[flink-statefun] branch master updated (494f988 -> 61ab8db)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 494f988  [FLINK-16063] [config] Add ASYNC_MAX_OPERATIONS_PER_TASK config
     new f33669c  [FLINK-16063][core] Add a BackPressureValve
     new 3bb5c65  [FLINK-16063][core] Wire BackPressureValve
     new af6b87c  [FLINK-16063][core] Use BackpressureValve in AsyncSink
     new f3f5c72  [FLINK-16063][core] Add address blocking to ReusableContext
     new 61ab8db  [FLINK-16063][core] Apply back pressure in FunctionGroupOperator

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../{SystemNanoTimer.java => AsyncWaiter.java}     | 41 ++++-----
 .../flink/core/backpressure/BackPressureValve.java | 56 ++++++++++++
 .../backpressure/ThresholdBackPressureValve.java   | 99 ++++++++++++++++++++++
 .../statefun/flink/core/functions/AsyncSink.java   | 19 ++++-
 .../core/functions/FunctionGroupOperator.java      | 12 ++-
 .../statefun/flink/core/functions/Reductions.java  |  4 +
 .../flink/core/functions/ReusableContext.java      |  8 +-
 .../ThresholdBackPressureValveTest.java            | 92 ++++++++++++++++++++
 .../flink/core/functions/ReductionsTest.java       |  2 +
 9 files changed, 305 insertions(+), 28 deletions(-)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/{SystemNanoTimer.java => AsyncWaiter.java} (51%)
 create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java
 create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
 create mode 100644 statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValveTest.java


[flink-statefun] 05/05: [FLINK-16063][core] Apply back pressure in FunctionGroupOperator

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 61ab8db4b03ed5828f9a85cb0ed5b07c6c4d5ce3
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Thu Feb 20 22:44:53 2020 +0100

    [FLINK-16063][core] Apply back pressure in FunctionGroupOperator
    
    This closes #29.
---
 .../flink/core/functions/FunctionGroupOperator.java         | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 17162cc..713368e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses;
+import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
 import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve;
 import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
 import org.apache.flink.statefun.flink.core.message.Message;
@@ -55,6 +56,7 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
   // -- runtime
   private transient Reductions reductions;
   private transient MailboxExecutor mailboxExecutor;
+  private transient BackPressureValve backPressureValve;
 
   FunctionGroupOperator(
       Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs,
@@ -72,7 +74,10 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
   // ------------------------------------------------------------------------------------------------------------------
 
   @Override
-  public void processElement(StreamRecord<Message> record) {
+  public void processElement(StreamRecord<Message> record) throws InterruptedException {
+    while (backPressureValve.shouldBackPressure()) {
+      mailboxExecutor.yield();
+    }
     reductions.apply(record.getValue());
   }
 
@@ -96,15 +101,15 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
 
     Objects.requireNonNull(mailboxExecutor, "MailboxExecutor is unexpectedly NULL");
 
-    // TODO: once FLINK-16149 would be merged, we should pass the threshold as a configuration.
-    ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(1_000);
+    this.backPressureValve =
+        new ThresholdBackPressureValve(configuration.getMaxAsyncOperationsPerTask());
 
     //
     // the core logic of applying messages to functions.
     //
     this.reductions =
         Reductions.create(
-            thresholdBackPressureValve,
+            backPressureValve,
             statefulFunctionsUniverse,
             getRuntimeContext(),
             getKeyedStateBackend(),


[flink-statefun] 03/05: [FLINK-16063][core] Use BackpressureValve in AsyncSink

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit af6b87c29b6b9d07444ed2c10bfc0f3b12810484
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Thu Feb 20 22:14:28 2020 +0100

    [FLINK-16063][core] Use BackpressureValve in AsyncSink
---
 .../statefun/flink/core/functions/AsyncSink.java      | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
index 6ba6d51..aa8497b 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
@@ -23,17 +23,20 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
 import org.apache.flink.statefun.flink.core.di.Inject;
 import org.apache.flink.statefun.flink.core.di.Label;
 import org.apache.flink.statefun.flink.core.di.Lazy;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.queue.Locks;
 import org.apache.flink.statefun.flink.core.queue.MpscQueue;
+import org.apache.flink.statefun.sdk.Address;
 
 final class AsyncSink {
   private final MapState<Long, Message> pendingAsyncOperations;
   private final Lazy<Reductions> reductions;
   private final Executor operatorMailbox;
+  private final BackPressureValve backPressureValve;
 
   private final MpscQueue<Message> completed = new MpscQueue<>(32768, Locks.jdkReentrantLock());
 
@@ -41,10 +44,12 @@ final class AsyncSink {
   AsyncSink(
       @Label("async-operations") MapState<Long, Message> pendingAsyncOperations,
       @Label("mailbox-executor") Executor operatorMailbox,
-      @Label("reductions") Lazy<Reductions> reductions) {
+      @Label("reductions") Lazy<Reductions> reductions,
+      @Label("backpressure-valve") BackPressureValve backPressureValve) {
     this.pendingAsyncOperations = Objects.requireNonNull(pendingAsyncOperations);
     this.reductions = Objects.requireNonNull(reductions);
     this.operatorMailbox = Objects.requireNonNull(operatorMailbox);
+    this.backPressureValve = Objects.requireNonNull(backPressureValve);
   }
 
   <T> void accept(Message metadata, CompletableFuture<T> future) {
@@ -60,9 +65,20 @@ final class AsyncSink {
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
+    backPressureValve.notifyAsyncOperationRegistered();
     future.whenComplete((result, throwable) -> enqueue(metadata, futureId, result, throwable));
   }
 
+  /**
+   * Requests to stop processing any further input for that address, as long as there is an
+   * uncompleted async operation (owned by @address).
+   *
+   * @param address the address
+   */
+  void blockAddress(Address address) {
+    backPressureValve.blockAddress(address);
+  }
+
   private <T> void enqueue(Message message, long futureId, T result, Throwable throwable) {
     AsyncMessageDecorator<T> decoratedMessage =
         new AsyncMessageDecorator<>(pendingAsyncOperations, futureId, message, result, throwable);
@@ -79,6 +95,7 @@ final class AsyncSink {
     Reductions reductions = this.reductions.get();
     Message message;
     while ((message = batchOfCompletedFutures.poll()) != null) {
+      backPressureValve.notifyAsyncOperationCompleted(message.target());
       reductions.enqueue(message);
     }
     reductions.processEnvelopes();


[flink-statefun] 01/05: [FLINK-16063][core] Add a BackPressureValve

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit f33669ce04ff9437f68e91fbbc5f99c75c513cfe
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Thu Feb 20 22:02:06 2020 +0100

    [FLINK-16063][core] Add a BackPressureValve
    
    This commit adds a mechanism to express back pressure
    management along with a simple threshold based implementation.
---
 .../flink/core/backpressure/BackPressureValve.java | 56 ++++++++++++
 .../backpressure/ThresholdBackPressureValve.java   | 99 ++++++++++++++++++++++
 .../ThresholdBackPressureValveTest.java            | 92 ++++++++++++++++++++
 3 files changed, 247 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java
new file mode 100644
index 0000000..29f1c19
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.backpressure;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.Address;
+
+public interface BackPressureValve {
+
+  /**
+   * Indicates rather a back pressure is needed.
+   *
+   * @return true if a back pressure should be applied.
+   */
+  boolean shouldBackPressure();
+
+  /**
+   * Notifies the back pressure mechanism that a async operation was registered via {@link
+   * org.apache.flink.statefun.sdk.Context#registerAsyncOperation(Object, CompletableFuture)}.
+   */
+  void notifyAsyncOperationRegistered();
+
+  /**
+   * Notifies when a async operation, registered by @owningAddress was completed.
+   *
+   * @param owningAddress the owner of the completed async operation.
+   */
+  void notifyAsyncOperationCompleted(Address owningAddress);
+
+  /**
+   * Requests to stop processing any further input for that address, as long as there is an
+   * uncompleted async operation (registered by @address).
+   *
+   * <p>NOTE: The address would unblocked as soon as some (one) async operation registered by that
+   * address completes.
+   *
+   * @param address the address
+   */
+  void blockAddress(Address address);
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
new file mode 100644
index 0000000..1ce0b00
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.backpressure;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashMap;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.Address;
+
+/**
+ * A simple Threshold based {@link BackPressureValve}.
+ *
+ * <p>There are two cases where a backpressure would be triggered:
+ *
+ * <ul>
+ *   <li>The total number of in-flight async operations in a StreamTask exceeds a predefined
+ *       threshold. This is tracked by {@link
+ *       ThresholdBackPressureValve#pendingAsynchronousOperationsCount}, it is incremented when an
+ *       async operation is registered, and decremented when it is completed.
+ *   <li>A specific address has requested to stop processing new inputs, this is tracked by the
+ *       {@link ThresholdBackPressureValve#blockedAddressSet}. The method {@link
+ *       ThresholdBackPressureValve#notifyAsyncOperationCompleted(Address)} is meant to be called
+ *       when ANY async operation has been completed.
+ * </ul>
+ */
+public final class ThresholdBackPressureValve implements BackPressureValve {
+  private final int maximumPendingAsynchronousOperations;
+
+  /**
+   * a set of address that had explicitly requested to stop processing any new inputs (via {@link
+   * AsyncWaiter#awaitAsyncOperationComplete()}. Note that this is a set implemented on top of a
+   * map, and the value (Boolean) has no meaning.
+   */
+  private final ObjectOpenHashMap<Address, Boolean> blockedAddressSet =
+      new ObjectOpenHashMap<>(1024);
+
+  private int pendingAsynchronousOperationsCount;
+
+  /**
+   * Constructs a ThresholdBackPressureValve.
+   *
+   * @param maximumPendingAsynchronousOperations the total allowed async operations to be inflight
+   *     per StreamTask, or {@code -1} to disable back pressure.
+   */
+  public ThresholdBackPressureValve(int maximumPendingAsynchronousOperations) {
+    this.maximumPendingAsynchronousOperations = maximumPendingAsynchronousOperations;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean shouldBackPressure() {
+    return totalPendingAsyncOperationsAtCapacity() || hasBlockedAddress();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void blockAddress(Address address) {
+    Objects.requireNonNull(address);
+    blockedAddressSet.put(address, Boolean.TRUE);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void notifyAsyncOperationRegistered() {
+    pendingAsynchronousOperationsCount++;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void notifyAsyncOperationCompleted(Address owningAddress) {
+    Objects.requireNonNull(owningAddress);
+    pendingAsynchronousOperationsCount = Math.max(0, pendingAsynchronousOperationsCount - 1);
+    blockedAddressSet.remove(owningAddress);
+  }
+
+  private boolean totalPendingAsyncOperationsAtCapacity() {
+    return maximumPendingAsynchronousOperations > 0
+        && pendingAsynchronousOperationsCount >= maximumPendingAsynchronousOperations;
+  }
+
+  private boolean hasBlockedAddress() {
+    return !blockedAddressSet.isEmpty();
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValveTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValveTest.java
new file mode 100644
index 0000000..4ececcf
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValveTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.backpressure;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.flink.statefun.flink.core.TestUtils;
+import org.junit.Test;
+
+public class ThresholdBackPressureValveTest {
+
+  @Test
+  public void simpleUsage() {
+    ThresholdBackPressureValve valve = new ThresholdBackPressureValve(2);
+
+    valve.notifyAsyncOperationRegistered();
+    valve.notifyAsyncOperationRegistered();
+
+    assertTrue(valve.shouldBackPressure());
+  }
+
+  @Test
+  public void completedOperationReleaseBackpressure() {
+    ThresholdBackPressureValve valve = new ThresholdBackPressureValve(1);
+
+    valve.notifyAsyncOperationRegistered();
+    valve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_1_ADDR);
+
+    assertFalse(valve.shouldBackPressure());
+  }
+
+  @Test
+  public void blockAddressTriggerBackpressure() {
+    ThresholdBackPressureValve valve = new ThresholdBackPressureValve(500);
+
+    valve.blockAddress(TestUtils.FUNCTION_1_ADDR);
+
+    assertTrue(valve.shouldBackPressure());
+  }
+
+  @Test
+  public void blockingAndUnblockingAddress() {
+    ThresholdBackPressureValve valve = new ThresholdBackPressureValve(500);
+
+    valve.blockAddress(TestUtils.FUNCTION_1_ADDR);
+    valve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_1_ADDR);
+
+    assertFalse(valve.shouldBackPressure());
+  }
+
+  @Test
+  public void unblockingDifferentAddressStillBackpressures() {
+    ThresholdBackPressureValve valve = new ThresholdBackPressureValve(500);
+
+    valve.blockAddress(TestUtils.FUNCTION_1_ADDR);
+    valve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_2_ADDR);
+
+    assertTrue(valve.shouldBackPressure());
+  }
+
+  @Test
+  public void blockTwoAddress() {
+    ThresholdBackPressureValve valve = new ThresholdBackPressureValve(500);
+
+    valve.blockAddress(TestUtils.FUNCTION_1_ADDR);
+    valve.blockAddress(TestUtils.FUNCTION_2_ADDR);
+    assertTrue(valve.shouldBackPressure());
+
+    valve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_1_ADDR);
+    assertTrue(valve.shouldBackPressure());
+
+    valve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_2_ADDR);
+    assertFalse(valve.shouldBackPressure());
+  }
+}


[flink-statefun] 04/05: [FLINK-16063][core] Add address blocking to ReusableContext

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit f3f5c729502d333b0410ea84689234831e9b3a2b
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Thu Feb 20 22:26:45 2020 +0100

    [FLINK-16063][core] Add address blocking to ReusableContext
    
    This commit introdcues an internal interface to be used by
    internal built in functions (like HttpFunction) to signal
    to the runtime, that it can not take any more input.
---
 .../flink/core/backpressure/AsyncWaiter.java       | 39 ++++++++++++++++++++++
 .../flink/core/functions/ReusableContext.java      |  8 ++++-
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/AsyncWaiter.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/AsyncWaiter.java
new file mode 100644
index 0000000..ddbe247
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/AsyncWaiter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.backpressure;
+
+import org.apache.flink.annotation.Internal;
+
+@Internal
+public interface AsyncWaiter {
+
+  /**
+   * Signals the runtime to stop invoking the currently executing function with new input until at
+   * least one {@link org.apache.flink.statefun.sdk.AsyncOperationResult} belonging to this function
+   * would be delivered.
+   *
+   * <p>NOTE: If a function would request to block without actually registering any async operations
+   * either previously or during its current invocation, then it would remain blocked. Since this is
+   * an internal API to be used by the remote functions we don't do anything to prevent that.
+   *
+   * <p>If we would like it to be a part of the SDK then we would have to make sure that we track
+   * every async operation registered per each address.
+   */
+  void awaitAsyncOperationComplete();
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
index 1408d8b..55c646b 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.statefun.flink.core.functions;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
 import org.apache.flink.statefun.flink.core.di.Inject;
 import org.apache.flink.statefun.flink.core.di.Label;
 import org.apache.flink.statefun.flink.core.message.Message;
@@ -28,7 +29,7 @@ import org.apache.flink.statefun.flink.core.state.State;
 import org.apache.flink.statefun.sdk.Address;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
 
-final class ReusableContext implements ApplyingContext {
+final class ReusableContext implements ApplyingContext, AsyncWaiter {
   private final Partition thisPartition;
   private final LocalSink localSink;
   private final RemoteSink remoteSink;
@@ -116,6 +117,11 @@ final class ReusableContext implements ApplyingContext {
   }
 
   @Override
+  public void awaitAsyncOperationComplete() {
+    asyncSink.blockAddress(self());
+  }
+
+  @Override
   public Address caller() {
     return in.source();
   }


[flink-statefun] 02/05: [FLINK-16063][core] Wire BackPressureValve

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 3bb5c652666eaaa469acab1c66bf4e9b9678c01e
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Thu Feb 20 22:11:49 2020 +0100

    [FLINK-16063][core] Wire BackPressureValve
---
 .../flink/statefun/flink/core/functions/FunctionGroupOperator.java   | 5 +++++
 .../org/apache/flink/statefun/flink/core/functions/Reductions.java   | 4 ++++
 .../apache/flink/statefun/flink/core/functions/ReductionsTest.java   | 2 ++
 3 files changed, 11 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 13a8dac..17162cc 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses;
+import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve;
 import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.message.MessageFactory;
@@ -95,11 +96,15 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
 
     Objects.requireNonNull(mailboxExecutor, "MailboxExecutor is unexpectedly NULL");
 
+    // TODO: once FLINK-16149 would be merged, we should pass the threshold as a configuration.
+    ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(1_000);
+
     //
     // the core logic of applying messages to functions.
     //
     this.reductions =
         Reductions.create(
+            thresholdBackPressureValve,
             statefulFunctionsUniverse,
             getRuntimeContext(),
             getKeyedStateBackend(),
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
index fec2c92..7561541 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
+import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
 import org.apache.flink.statefun.flink.core.di.Inject;
 import org.apache.flink.statefun.flink.core.di.Lazy;
 import org.apache.flink.statefun.flink.core.di.ObjectContainer;
@@ -51,6 +52,7 @@ final class Reductions {
   }
 
   static Reductions create(
+      BackPressureValve valve,
       StatefulFunctionsUniverse statefulFunctionsUniverse,
       RuntimeContext context,
       KeyedStateBackend<Object> keyedStateBackend,
@@ -116,6 +118,8 @@ final class Reductions {
     container.add("async-operations", MapState.class, asyncOperations);
     container.add(AsyncSink.class);
 
+    container.add("backpressure-valve", BackPressureValve.class, valve);
+
     return container.get(Reductions.class);
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
index e279913..78687c3 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.TestUtils;
+import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -88,6 +89,7 @@ public class ReductionsTest {
   public void testFactory() {
     Reductions reductions =
         Reductions.create(
+            new ThresholdBackPressureValve(-1),
             new StatefulFunctionsUniverse(MessageFactoryType.WITH_KRYO_PAYLOADS),
             new FakeRuntimeContext(),
             new FakeKeyedStateBackend(),