You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/08 10:28:43 UTC
[flink-statefun] 01/01: [FLINK-19130] [core] Apply backlog metrics
in RequestReplyFunction
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 9c115e75ac8a7f409b8094dd25f6b880c1ba8cfb
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Sep 8 14:19:01 2020 +0800
[FLINK-19130] [core] Apply backlog metrics in RequestReplyFunction
This closes #142.
---
.../flink/core/reqreply/RequestReplyFunction.java | 19 ++++----
.../core/reqreply/RequestReplyFunctionTest.java | 54 ++++++++++++++++++----
2 files changed, 56 insertions(+), 17 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index bac18c4..a254d43 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -79,17 +79,18 @@ public final class RequestReplyFunction implements StatefulFunction {
@Override
public void invoke(Context context, Object input) {
+ InternalContext castedContext = (InternalContext) context;
if (!(input instanceof AsyncOperationResult)) {
- onRequest(context, (Any) input);
+ onRequest(castedContext, (Any) input);
return;
}
@SuppressWarnings("unchecked")
AsyncOperationResult<ToFunction, FromFunction> result =
(AsyncOperationResult<ToFunction, FromFunction>) input;
- onAsyncResult(context, result);
+ onAsyncResult(castedContext, result);
}
- private void onRequest(Context context, Any message) {
+ private void onRequest(InternalContext context, Any message) {
Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
int inflightOrBatched = requestState.getOrDefault(-1);
if (inflightOrBatched < 0) {
@@ -106,17 +107,18 @@ public final class RequestReplyFunction implements StatefulFunction {
batch.append(invocationBuilder.build());
inflightOrBatched++;
requestState.set(inflightOrBatched);
+ context.functionTypeMetrics().appendBacklogMessages(1);
if (isMaxNumBatchRequestsExceeded(inflightOrBatched)) {
// we are at capacity, can't add anything to the batch.
// we need to signal to the runtime that we are unable to process any new input
// and we must wait for our in flight asynchronous operation to complete before
// we are able to process more input.
- ((InternalContext) context).awaitAsyncOperationComplete();
+ context.awaitAsyncOperationComplete();
}
}
private void onAsyncResult(
- Context context, AsyncOperationResult<ToFunction, FromFunction> asyncResult) {
+ InternalContext context, AsyncOperationResult<ToFunction, FromFunction> asyncResult) {
if (asyncResult.unknown()) {
ToFunction batch = asyncResult.metadata();
sendToFunction(context, batch);
@@ -125,10 +127,10 @@ public final class RequestReplyFunction implements StatefulFunction {
InvocationResponse invocationResult = unpackInvocationOrThrow(context.self(), asyncResult);
handleInvocationResponse(context, invocationResult);
- final int state = requestState.getOrDefault(-1);
- if (state < 0) {
+ final int numBatched = requestState.getOrDefault(-1);
+ if (numBatched < 0) {
throw new IllegalStateException("Got an unexpected async result");
- } else if (state == 0) {
+ } else if (numBatched == 0) {
requestState.clear();
} else {
final InvocationBatchRequest.Builder nextBatch = getNextBatch();
@@ -139,6 +141,7 @@ public final class RequestReplyFunction implements StatefulFunction {
// b) sending the accumulated batch to the remote function.
requestState.set(0);
batch.clear();
+ context.functionTypeMetrics().consumeBacklogMessages(numBatched);
sendToFunction(context, nextBatch);
}
}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index cd70588..d4d341a 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -209,6 +209,32 @@ public class RequestReplyFunctionTest {
new EgressIdentifier<>("org.foo", "bar", Any.class), context.egresses.get(0).getKey());
}
+ @Test
+ public void backlogMetricsIncreasedOnInvoke() {
+ functionUnderTest.invoke(context, Any.getDefaultInstance());
+
+ // following should be accounted into backlog metrics
+ functionUnderTest.invoke(context, Any.getDefaultInstance());
+ functionUnderTest.invoke(context, Any.getDefaultInstance());
+
+ assertThat(context.functionTypeMetrics().numBacklog, is(2));
+ }
+
+ @Test
+ public void backlogMetricsDecreasedOnNextSuccess() {
+ functionUnderTest.invoke(context, Any.getDefaultInstance());
+
+ // following should be accounted into backlog metrics
+ functionUnderTest.invoke(context, Any.getDefaultInstance());
+ functionUnderTest.invoke(context, Any.getDefaultInstance());
+
+ // complete one message, should fully consume backlog
+ context.needsWaiting = false;
+ functionUnderTest.invoke(context, successfulAsyncOperation());
+
+ assertThat(context.functionTypeMetrics().numBacklog, is(0));
+ }
+
private static AsyncOperationResult<Object, FromFunction> successfulAsyncOperation() {
return new AsyncOperationResult<>(
new Object(), Status.SUCCESS, FromFunction.getDefaultInstance(), null);
@@ -251,7 +277,7 @@ public class RequestReplyFunctionTest {
private static final class FakeContext implements InternalContext {
- private final FunctionTypeMetrics fakeMetrics = new FakeMetrics();
+ private final BacklogTrackingMetrics fakeMetrics = new BacklogTrackingMetrics();
Address caller;
boolean needsWaiting;
@@ -266,7 +292,7 @@ public class RequestReplyFunctionTest {
}
@Override
- public FunctionTypeMetrics functionTypeMetrics() {
+ public BacklogTrackingMetrics functionTypeMetrics() {
return fakeMetrics;
}
@@ -297,7 +323,23 @@ public class RequestReplyFunctionTest {
public <M, T> void registerAsyncOperation(M metadata, CompletableFuture<T> future) {}
}
- private static final class FakeMetrics implements FunctionTypeMetrics {
+ private static final class BacklogTrackingMetrics implements FunctionTypeMetrics {
+
+ private int numBacklog = 0;
+
+ public int numBacklog() {
+ return numBacklog;
+ }
+
+ @Override
+ public void appendBacklogMessages(int count) {
+ numBacklog += count;
+ }
+
+ @Override
+ public void consumeBacklogMessages(int count) {
+ numBacklog -= count;
+ }
@Override
public void asyncOperationRegistered() {}
@@ -322,11 +364,5 @@ public class RequestReplyFunctionTest {
@Override
public void unblockedAddress() {}
-
- @Override
- public void appendBacklogMessages(int count) {}
-
- @Override
- public void consumeBacklogMessages(int count) {}
}
}