You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/08 10:28:43 UTC

[flink-statefun] 01/01: [FLINK-19130] [core] Apply backlog metrics in RequestReplyFunction

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 9c115e75ac8a7f409b8094dd25f6b880c1ba8cfb
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Sep 8 14:19:01 2020 +0800

    [FLINK-19130] [core] Apply backlog metrics in RequestReplyFunction
    
    This closes #142.
---
 .../flink/core/reqreply/RequestReplyFunction.java  | 19 ++++----
 .../core/reqreply/RequestReplyFunctionTest.java    | 54 ++++++++++++++++++----
 2 files changed, 56 insertions(+), 17 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index bac18c4..a254d43 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -79,17 +79,18 @@ public final class RequestReplyFunction implements StatefulFunction {
 
   @Override
   public void invoke(Context context, Object input) {
+    InternalContext castedContext = (InternalContext) context;
     if (!(input instanceof AsyncOperationResult)) {
-      onRequest(context, (Any) input);
+      onRequest(castedContext, (Any) input);
       return;
     }
     @SuppressWarnings("unchecked")
     AsyncOperationResult<ToFunction, FromFunction> result =
         (AsyncOperationResult<ToFunction, FromFunction>) input;
-    onAsyncResult(context, result);
+    onAsyncResult(castedContext, result);
   }
 
-  private void onRequest(Context context, Any message) {
+  private void onRequest(InternalContext context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
     int inflightOrBatched = requestState.getOrDefault(-1);
     if (inflightOrBatched < 0) {
@@ -106,17 +107,18 @@ public final class RequestReplyFunction implements StatefulFunction {
     batch.append(invocationBuilder.build());
     inflightOrBatched++;
     requestState.set(inflightOrBatched);
+    context.functionTypeMetrics().appendBacklogMessages(1);
     if (isMaxNumBatchRequestsExceeded(inflightOrBatched)) {
       // we are at capacity, can't add anything to the batch.
       // we need to signal to the runtime that we are unable to process any new input
       // and we must wait for our in flight asynchronous operation to complete before
       // we are able to process more input.
-      ((InternalContext) context).awaitAsyncOperationComplete();
+      context.awaitAsyncOperationComplete();
     }
   }
 
   private void onAsyncResult(
-      Context context, AsyncOperationResult<ToFunction, FromFunction> asyncResult) {
+      InternalContext context, AsyncOperationResult<ToFunction, FromFunction> asyncResult) {
     if (asyncResult.unknown()) {
       ToFunction batch = asyncResult.metadata();
       sendToFunction(context, batch);
@@ -125,10 +127,10 @@ public final class RequestReplyFunction implements StatefulFunction {
     InvocationResponse invocationResult = unpackInvocationOrThrow(context.self(), asyncResult);
     handleInvocationResponse(context, invocationResult);
 
-    final int state = requestState.getOrDefault(-1);
-    if (state < 0) {
+    final int numBatched = requestState.getOrDefault(-1);
+    if (numBatched < 0) {
       throw new IllegalStateException("Got an unexpected async result");
-    } else if (state == 0) {
+    } else if (numBatched == 0) {
       requestState.clear();
     } else {
       final InvocationBatchRequest.Builder nextBatch = getNextBatch();
@@ -139,6 +141,7 @@ public final class RequestReplyFunction implements StatefulFunction {
       // b) sending the accumulated batch to the remote function.
       requestState.set(0);
       batch.clear();
+      context.functionTypeMetrics().consumeBacklogMessages(numBatched);
       sendToFunction(context, nextBatch);
     }
   }
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index cd70588..d4d341a 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -209,6 +209,32 @@ public class RequestReplyFunctionTest {
         new EgressIdentifier<>("org.foo", "bar", Any.class), context.egresses.get(0).getKey());
   }
 
+  @Test
+  public void backlogMetricsIncreasedOnInvoke() {
+    functionUnderTest.invoke(context, Any.getDefaultInstance());
+
+    // following should be accounted into backlog metrics
+    functionUnderTest.invoke(context, Any.getDefaultInstance());
+    functionUnderTest.invoke(context, Any.getDefaultInstance());
+
+    assertThat(context.functionTypeMetrics().numBacklog, is(2));
+  }
+
+  @Test
+  public void backlogMetricsDecreasedOnNextSuccess() {
+    functionUnderTest.invoke(context, Any.getDefaultInstance());
+
+    // following should be accounted into backlog metrics
+    functionUnderTest.invoke(context, Any.getDefaultInstance());
+    functionUnderTest.invoke(context, Any.getDefaultInstance());
+
+    // complete one message, should fully consume backlog
+    context.needsWaiting = false;
+    functionUnderTest.invoke(context, successfulAsyncOperation());
+
+    assertThat(context.functionTypeMetrics().numBacklog, is(0));
+  }
+
   private static AsyncOperationResult<Object, FromFunction> successfulAsyncOperation() {
     return new AsyncOperationResult<>(
         new Object(), Status.SUCCESS, FromFunction.getDefaultInstance(), null);
@@ -251,7 +277,7 @@ public class RequestReplyFunctionTest {
 
   private static final class FakeContext implements InternalContext {
 
-    private final FunctionTypeMetrics fakeMetrics = new FakeMetrics();
+    private final BacklogTrackingMetrics fakeMetrics = new BacklogTrackingMetrics();
 
     Address caller;
     boolean needsWaiting;
@@ -266,7 +292,7 @@ public class RequestReplyFunctionTest {
     }
 
     @Override
-    public FunctionTypeMetrics functionTypeMetrics() {
+    public BacklogTrackingMetrics functionTypeMetrics() {
       return fakeMetrics;
     }
 
@@ -297,7 +323,23 @@ public class RequestReplyFunctionTest {
     public <M, T> void registerAsyncOperation(M metadata, CompletableFuture<T> future) {}
   }
 
-  private static final class FakeMetrics implements FunctionTypeMetrics {
+  private static final class BacklogTrackingMetrics implements FunctionTypeMetrics {
+
+    private int numBacklog = 0;
+
+    public int numBacklog() {
+      return numBacklog;
+    }
+
+    @Override
+    public void appendBacklogMessages(int count) {
+      numBacklog += count;
+    }
+
+    @Override
+    public void consumeBacklogMessages(int count) {
+      numBacklog -= count;
+    }
 
     @Override
     public void asyncOperationRegistered() {}
@@ -322,11 +364,5 @@ public class RequestReplyFunctionTest {
 
     @Override
     public void unblockedAddress() {}
-
-    @Override
-    public void appendBacklogMessages(int count) {}
-
-    @Override
-    public void consumeBacklogMessages(int count) {}
   }
 }