You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/08 10:28:42 UTC

[flink-statefun] branch master updated (09b4d4f -> 9c115e7)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 09b4d4f  [hotfix] [docs] Upgrade Flink version in docs config
     add 78cbc19  [FLINK-19130] [core] Extend metric interfaces for backpressure metrics
     add 6408866  [FLINK-19130] [core] Let StatefulFunctionsRepository extend FunctionTypeMetricsRepository
     add 0fbb17a  [FLINK-19130] [core] Allow ObjectContainer to add alias keys
     add 6c77ca4  [FLINK-19130] [core] Wire-in new metrics into AsyncSink
     add 6bb5cde  [FLINK-19130] [core] Rename MetricsFactory to FunctionTypeMetricsFactory
     add 7ed411d  [FLINK-19130] [core] Rename AsyncWaiter to InternalContext
     add de60b64  [FLINK-19130] [core] Expose function type metrics via InternalContext
     add eb63dc5  [FLINK-19130] [core] Add backlog messages to FunctionTypeMetrics
     new 9c115e7  [FLINK-19130] [core] Apply backlog metrics in RequestReplyFunction

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/core/backpressure/BackPressureValve.java |  8 +++
 .../{AsyncWaiter.java => InternalContext.java}     | 11 ++-
 .../backpressure/ThresholdBackPressureValve.java   |  8 ++-
 .../statefun/flink/core/di/ObjectContainer.java    |  8 +++
 .../statefun/flink/core/functions/AsyncSink.java   | 31 ++++++--
 .../statefun/flink/core/functions/Reductions.java  | 21 +++++-
 .../flink/core/functions/ReusableContext.java      | 12 +++-
 .../core/functions/StatefulFunctionRepository.java | 17 +++--
 ...ry.java => FlinkFuncionTypeMetricsFactory.java} |  4 +-
 ...ry.java => FlinkFunctionDispatcherMetrics.java} | 23 +++---
 .../core/metrics/FlinkFunctionTypeMetrics.java     | 36 ++++++++++
 ...Factory.java => FuncionTypeMetricsFactory.java} |  2 +-
 ...Factory.java => FunctionDispatcherMetrics.java} |  7 +-
 .../flink/core/metrics/FunctionTypeMetrics.java    | 12 ++++
 ...ory.java => FunctionTypeMetricsRepository.java} |  6 +-
 .../flink/core/reqreply/RequestReplyFunction.java  | 21 +++---
 .../flink/core/di/ObjectContainerTest.java}        | 31 ++++----
 .../flink/core/functions/ReductionsTest.java       |  3 +-
 .../core/reqreply/RequestReplyFunctionTest.java    | 82 +++++++++++++++++++++-
 19 files changed, 279 insertions(+), 64 deletions(-)
 rename statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/{AsyncWaiter.java => InternalContext.java} (81%)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/{FlinkMetricsFactory.java => FlinkFuncionTypeMetricsFactory.java} (90%)
 rename statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/{FlinkMetricsFactory.java => FlinkFunctionDispatcherMetrics.java} (63%)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/{MetricsFactory.java => FuncionTypeMetricsFactory.java} (95%)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/{MetricsFactory.java => FunctionDispatcherMetrics.java} (85%)
 rename statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/{MetricsFactory.java => FunctionTypeMetricsRepository.java} (89%)
 copy statefun-flink/{statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/common/ReflectionUtilTest.java => statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/di/ObjectContainerTest.java} (58%)


[flink-statefun] 01/01: [FLINK-19130] [core] Apply backlog metrics in RequestReplyFunction

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 9c115e75ac8a7f409b8094dd25f6b880c1ba8cfb
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Sep 8 14:19:01 2020 +0800

    [FLINK-19130] [core] Apply backlog metrics in RequestReplyFunction
    
    This closes #142.
---
 .../flink/core/reqreply/RequestReplyFunction.java  | 19 ++++----
 .../core/reqreply/RequestReplyFunctionTest.java    | 54 ++++++++++++++++++----
 2 files changed, 56 insertions(+), 17 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index bac18c4..a254d43 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -79,17 +79,18 @@ public final class RequestReplyFunction implements StatefulFunction {
 
   @Override
   public void invoke(Context context, Object input) {
+    InternalContext castedContext = (InternalContext) context;
     if (!(input instanceof AsyncOperationResult)) {
-      onRequest(context, (Any) input);
+      onRequest(castedContext, (Any) input);
       return;
     }
     @SuppressWarnings("unchecked")
     AsyncOperationResult<ToFunction, FromFunction> result =
         (AsyncOperationResult<ToFunction, FromFunction>) input;
-    onAsyncResult(context, result);
+    onAsyncResult(castedContext, result);
   }
 
-  private void onRequest(Context context, Any message) {
+  private void onRequest(InternalContext context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
     int inflightOrBatched = requestState.getOrDefault(-1);
     if (inflightOrBatched < 0) {
@@ -106,17 +107,18 @@ public final class RequestReplyFunction implements StatefulFunction {
     batch.append(invocationBuilder.build());
     inflightOrBatched++;
     requestState.set(inflightOrBatched);
+    context.functionTypeMetrics().appendBacklogMessages(1);
     if (isMaxNumBatchRequestsExceeded(inflightOrBatched)) {
       // we are at capacity, can't add anything to the batch.
       // we need to signal to the runtime that we are unable to process any new input
       // and we must wait for our in flight asynchronous operation to complete before
       // we are able to process more input.
-      ((InternalContext) context).awaitAsyncOperationComplete();
+      context.awaitAsyncOperationComplete();
     }
   }
 
   private void onAsyncResult(
-      Context context, AsyncOperationResult<ToFunction, FromFunction> asyncResult) {
+      InternalContext context, AsyncOperationResult<ToFunction, FromFunction> asyncResult) {
     if (asyncResult.unknown()) {
       ToFunction batch = asyncResult.metadata();
       sendToFunction(context, batch);
@@ -125,10 +127,10 @@ public final class RequestReplyFunction implements StatefulFunction {
     InvocationResponse invocationResult = unpackInvocationOrThrow(context.self(), asyncResult);
     handleInvocationResponse(context, invocationResult);
 
-    final int state = requestState.getOrDefault(-1);
-    if (state < 0) {
+    final int numBatched = requestState.getOrDefault(-1);
+    if (numBatched < 0) {
       throw new IllegalStateException("Got an unexpected async result");
-    } else if (state == 0) {
+    } else if (numBatched == 0) {
       requestState.clear();
     } else {
       final InvocationBatchRequest.Builder nextBatch = getNextBatch();
@@ -139,6 +141,7 @@ public final class RequestReplyFunction implements StatefulFunction {
       // b) sending the accumulated batch to the remote function.
       requestState.set(0);
       batch.clear();
+      context.functionTypeMetrics().consumeBacklogMessages(numBatched);
       sendToFunction(context, nextBatch);
     }
   }
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index cd70588..d4d341a 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -209,6 +209,32 @@ public class RequestReplyFunctionTest {
         new EgressIdentifier<>("org.foo", "bar", Any.class), context.egresses.get(0).getKey());
   }
 
+  @Test
+  public void backlogMetricsIncreasedOnInvoke() {
+    functionUnderTest.invoke(context, Any.getDefaultInstance());
+
+    // following should be accounted into backlog metrics
+    functionUnderTest.invoke(context, Any.getDefaultInstance());
+    functionUnderTest.invoke(context, Any.getDefaultInstance());
+
+    assertThat(context.functionTypeMetrics().numBacklog, is(2));
+  }
+
+  @Test
+  public void backlogMetricsDecreasedOnNextSuccess() {
+    functionUnderTest.invoke(context, Any.getDefaultInstance());
+
+    // following should be accounted into backlog metrics
+    functionUnderTest.invoke(context, Any.getDefaultInstance());
+    functionUnderTest.invoke(context, Any.getDefaultInstance());
+
+    // complete one message, should fully consume backlog
+    context.needsWaiting = false;
+    functionUnderTest.invoke(context, successfulAsyncOperation());
+
+    assertThat(context.functionTypeMetrics().numBacklog, is(0));
+  }
+
   private static AsyncOperationResult<Object, FromFunction> successfulAsyncOperation() {
     return new AsyncOperationResult<>(
         new Object(), Status.SUCCESS, FromFunction.getDefaultInstance(), null);
@@ -251,7 +277,7 @@ public class RequestReplyFunctionTest {
 
   private static final class FakeContext implements InternalContext {
 
-    private final FunctionTypeMetrics fakeMetrics = new FakeMetrics();
+    private final BacklogTrackingMetrics fakeMetrics = new BacklogTrackingMetrics();
 
     Address caller;
     boolean needsWaiting;
@@ -266,7 +292,7 @@ public class RequestReplyFunctionTest {
     }
 
     @Override
-    public FunctionTypeMetrics functionTypeMetrics() {
+    public BacklogTrackingMetrics functionTypeMetrics() {
       return fakeMetrics;
     }
 
@@ -297,7 +323,23 @@ public class RequestReplyFunctionTest {
     public <M, T> void registerAsyncOperation(M metadata, CompletableFuture<T> future) {}
   }
 
-  private static final class FakeMetrics implements FunctionTypeMetrics {
+  private static final class BacklogTrackingMetrics implements FunctionTypeMetrics {
+
+    private int numBacklog = 0;
+
+    public int numBacklog() {
+      return numBacklog;
+    }
+
+    @Override
+    public void appendBacklogMessages(int count) {
+      numBacklog += count;
+    }
+
+    @Override
+    public void consumeBacklogMessages(int count) {
+      numBacklog -= count;
+    }
 
     @Override
     public void asyncOperationRegistered() {}
@@ -322,11 +364,5 @@ public class RequestReplyFunctionTest {
 
     @Override
     public void unblockedAddress() {}
-
-    @Override
-    public void appendBacklogMessages(int count) {}
-
-    @Override
-    public void consumeBacklogMessages(int count) {}
   }
 }