You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/10 08:46:18 UTC

[flink-statefun] branch master updated (9c115e7 -> ae57f9e)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 9c115e7  [FLINK-19130] [core] Apply backlog metrics in RequestReplyFunction
     add 7ccb866  [FLINK-19017] Add a RequestSummary class
     add 7ce1a6e   [FLINK-19017] Use ToFunctionRequestSummary in RequestReplyClient
     add dbaeb0a  [FLINK-19017] Pass RequestSummary to the retrying callback
     add 048f850  [FLINK-19017] Log retriable exceptions
     add 58c18a3  [FLINK-19017] Add dropwizzard metrics
     add efbeab8  [FLINK-19017] Add an interface for remote function metrics
     add 0b5b058  [FLINK-19017] Implement the remote function metrics interface
     add 0db114a  [FLINK-19017] Add remote function metrics to RequestReplyClient
     new 85d064a  [FLINK-19017] Report remote function invocation metrics
     new ae57f9e  [hotfix] Fix spotless style violations

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 statefun-flink/pom.xml                             | 15 ++++++++
 statefun-flink/statefun-flink-core/pom.xml         |  5 +++
 .../flink/core/httpfn/HttpRequestReplyClient.java  | 11 ++++--
 .../flink/core/httpfn/RetryingCallback.java        | 35 ++++++++++++++++--
 .../core/metrics/FlinkFunctionTypeMetrics.java     | 23 ++++++++++++
 .../flink/core/metrics/FunctionTypeMetrics.java    |  2 +-
 ...erMetrics.java => RemoteInvocationMetrics.java} |  6 ++--
 .../flink/core/reqreply/RequestReplyClient.java    |  6 +++-
 .../flink/core/reqreply/RequestReplyFunction.java  | 12 +++++--
 .../core/reqreply/ToFunctionRequestSummary.java    | 41 ++++++++++++++++------
 .../core/reqreply/RequestReplyFunctionTest.java    | 12 ++++++-
 statefun-flink/statefun-flink-distribution/pom.xml |  7 ++++
 12 files changed, 151 insertions(+), 24 deletions(-)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/{FunctionDispatcherMetrics.java => RemoteInvocationMetrics.java} (87%)
 copy statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MatchersByAddress.java => statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/ToFunctionRequestSummary.java (50%)


[flink-statefun] 02/02: [hotfix] Fix spotless style violations

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit ae57f9e4e5c5384e3a21226522837a0a4a938bbc
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Wed Sep 9 16:18:24 2020 +0200

    [hotfix] Fix spotless style violations
---
 .../flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java     | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
index d529b93..d9e5cb3 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
@@ -31,7 +31,6 @@ import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.RequestBody;
 import okhttp3.Response;
-
 import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
 import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction;
@@ -52,7 +51,9 @@ final class HttpRequestReplyClient implements RequestReplyClient {
 
   @Override
   public CompletableFuture<FromFunction> call(
-          ToFunctionRequestSummary requestSummary, RemoteInvocationMetrics metrics, ToFunction toFunction) {
+      ToFunctionRequestSummary requestSummary,
+      RemoteInvocationMetrics metrics,
+      ToFunction toFunction) {
     Request request =
         new Request.Builder()
             .url(url)


[flink-statefun] 01/02: [FLINK-19017] Report remote function invocation metrics

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 85d064af89c5b55de3fd2b665bbee2e35e1ba556
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Wed Sep 9 15:59:26 2020 +0200

    [FLINK-19017] Report remote function invocation metrics
    
    This closes #143.
---
 .../flink/core/httpfn/HttpRequestReplyClient.java  |  4 ++--
 .../flink/core/httpfn/RetryingCallback.java        | 26 ++++++++++++++++++++--
 2 files changed, 26 insertions(+), 4 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
index 93f4d0e..d529b93 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
@@ -60,8 +60,8 @@ final class HttpRequestReplyClient implements RequestReplyClient {
             .build();
 
     Call newCall = client.newCall(request);
-    RetryingCallback callback = new RetryingCallback(requestSummary, newCall.timeout());
-    newCall.enqueue(callback);
+    RetryingCallback callback = new RetryingCallback(requestSummary, metrics, newCall.timeout());
+    callback.attachToCall(newCall);
     return callback.future().thenApply(HttpRequestReplyClient::parseResponse);
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
index 18ca50e..0a6f834 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
@@ -24,11 +24,13 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import okhttp3.Call;
 import okhttp3.Callback;
 import okhttp3.Response;
 import okio.Timeout;
 import org.apache.flink.statefun.flink.core.backpressure.BoundedExponentialBackoff;
+import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
 import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
 import org.apache.flink.util.function.RunnableWithException;
 import org.slf4j.Logger;
@@ -46,17 +48,27 @@ final class RetryingCallback implements Callback {
   private final CompletableFuture<Response> resultFuture;
   private final BoundedExponentialBackoff backoff;
   private final ToFunctionRequestSummary requestSummary;
+  private final RemoteInvocationMetrics metrics;
 
-  RetryingCallback(ToFunctionRequestSummary requestSummary, Timeout timeout) {
+  private long requestStarted;
+
+  RetryingCallback(
+      ToFunctionRequestSummary requestSummary, RemoteInvocationMetrics metrics, Timeout timeout) {
     this.resultFuture = new CompletableFuture<>();
     this.backoff = new BoundedExponentialBackoff(INITIAL_BACKOFF_DURATION, duration(timeout));
     this.requestSummary = requestSummary;
+    this.metrics = metrics;
   }
 
   CompletableFuture<Response> future() {
     return resultFuture;
   }
 
+  void attachToCall(Call call) {
+    this.requestStarted = System.nanoTime();
+    call.enqueue(this);
+  }
+
   @Override
   public void onFailure(Call call, IOException cause) {
     tryWithFuture(() -> onFailureUnsafe(call, cause));
@@ -70,6 +82,8 @@ final class RetryingCallback implements Callback {
   private void onFailureUnsafe(Call call, IOException cause) {
     LOG.warn(
         "Retriable exception caught while trying to deliver a message: " + requestSummary, cause);
+    metrics.remoteInvocationFailures();
+
     if (!retryAfterApplyingBackoff(call)) {
       throw new IllegalStateException(
           "Maximal request time has elapsed. Last cause is attached", cause);
@@ -98,7 +112,8 @@ final class RetryingCallback implements Callback {
    */
   private boolean retryAfterApplyingBackoff(Call call) {
     if (backoff.applyNow()) {
-      call.clone().enqueue(this);
+      Call newCall = call.clone();
+      attachToCall(newCall);
       return true;
     }
     return false;
@@ -110,6 +125,7 @@ final class RetryingCallback implements Callback {
    */
   private void tryWithFuture(RunnableWithException runnable) {
     try {
+      endTimingRequest();
       runnable.run();
     } catch (Throwable t) {
       resultFuture.completeExceptionally(t);
@@ -119,4 +135,10 @@ final class RetryingCallback implements Callback {
   private static Duration duration(Timeout timeout) {
     return Duration.ofNanos(timeout.timeoutNanos());
   }
+
+  private void endTimingRequest() {
+    final long nanosecondsElapsed = System.nanoTime() - requestStarted;
+    final long millisecondsElapsed = TimeUnit.NANOSECONDS.toMillis(nanosecondsElapsed);
+    metrics.remoteInvocationLatency(millisecondsElapsed);
+  }
 }