You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/14 17:31:11 UTC

[flink-statefun] 06/11: [FLINK-15956] Use ExponentialBackoff during retires

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 69abd0984f8b3acb11306a38676c346e3a189c60
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Thu Feb 13 14:56:17 2020 +0100

    [FLINK-15956] Use ExponentialBackoff during retires
---
 .../flink/core/httpfn/HttpFunctionProvider.java    |   6 +-
 .../statefun/flink/core/httpfn/OkHttpUtils.java    |  70 ++++---------
 .../flink/core/httpfn/RetryingCallback.java        | 113 +++++++++++++++++++++
 3 files changed, 136 insertions(+), 53 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index be027e0..3d94109 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -29,11 +29,7 @@ public class HttpFunctionProvider implements StatefulFunctionProvider {
 
   public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) {
     this.supportedTypes = supportedTypes;
-    final long timeoutMs = 30_000;
-    // TODO: add various timeouts to HttpFunctionSpec
-    this.client =
-        OkHttpUtils.newClient(
-            timeoutMs, timeoutMs, 2 * timeoutMs, timeoutMs, Integer.MAX_VALUE, Integer.MAX_VALUE);
+    this.client = OkHttpUtils.newClient();
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java
index 100c932..c5fb4be 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java
@@ -18,13 +18,11 @@ package org.apache.flink.statefun.flink.core.httpfn;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
-import java.io.IOException;
 import java.io.InputStream;
+import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import okhttp3.Call;
-import okhttp3.Callback;
 import okhttp3.ConnectionPool;
 import okhttp3.Dispatcher;
 import okhttp3.MediaType;
@@ -35,63 +33,39 @@ import okhttp3.Response;
 final class OkHttpUtils {
   private OkHttpUtils() {}
 
-  static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream");
-
-  static CompletableFuture<Response> call(OkHttpClient client, Request request) {
-    CompletableFuture<Response> future = new CompletableFuture<>();
-    client.newCall(request).enqueue(new CompletableFutureCallback(future));
-    return future;
-  }
+  private static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofMinutes(2);
 
-  static InputStream responseBody(Response httpResponse) {
-    checkState(httpResponse.isSuccessful(), "Unexpected HTTP status code %s", httpResponse.code());
-    checkState(httpResponse.body() != null, "Unexpected empty HTTP response (no body)");
-    checkState(
-        Objects.equals(httpResponse.body().contentType(), MEDIA_TYPE_BINARY),
-        "Wrong HTTP content-type %s",
-        httpResponse.body().contentType());
-    return httpResponse.body().byteStream();
-  }
+  static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream");
 
-  static OkHttpClient newClient(
-      long readTimeoutMillis,
-      long writeTimeoutMillis,
-      long callTimeout,
-      long connectionTimeInMillis,
-      int maxRequestsPerHost,
-      int maxRequests) {
+  static OkHttpClient newClient() {
     Dispatcher dispatcher = new Dispatcher();
-    dispatcher.setMaxRequestsPerHost(maxRequestsPerHost);
-    dispatcher.setMaxRequests(maxRequests);
+    dispatcher.setMaxRequestsPerHost(Integer.MAX_VALUE);
+    dispatcher.setMaxRequests(Integer.MAX_VALUE);
 
     return new OkHttpClient.Builder()
-        .connectTimeout(connectionTimeInMillis, TimeUnit.MILLISECONDS)
-        .writeTimeout(writeTimeoutMillis, TimeUnit.MILLISECONDS)
-        .readTimeout(readTimeoutMillis, TimeUnit.MILLISECONDS)
-        .callTimeout(callTimeout, TimeUnit.MILLISECONDS)
+        .callTimeout(DEFAULT_CALL_TIMEOUT)
         .dispatcher(dispatcher)
         .connectionPool(new ConnectionPool())
         .followRedirects(true)
         .followSslRedirects(true)
+        .retryOnConnectionFailure(true)
         .build();
   }
 
-  @SuppressWarnings("NullableProblems")
-  private static final class CompletableFutureCallback implements Callback {
-    private final CompletableFuture<Response> future;
-
-    public CompletableFutureCallback(CompletableFuture<Response> future) {
-      this.future = Objects.requireNonNull(future);
-    }
-
-    @Override
-    public void onFailure(Call call, IOException e) {
-      future.completeExceptionally(e);
-    }
+  static CompletableFuture<Response> call(OkHttpClient client, Request request) {
+    Call newCall = client.newCall(request);
+    RetryingCallback callback = new RetryingCallback(newCall.timeout());
+    newCall.enqueue(callback);
+    return callback.future();
+  }
 
-    @Override
-    public void onResponse(Call call, Response response) {
-      future.complete(response);
-    }
+  static InputStream responseBody(Response httpResponse) {
+    checkState(httpResponse.isSuccessful(), "Unexpected HTTP status code %s", httpResponse.code());
+    checkState(httpResponse.body() != null, "Unexpected empty HTTP response (no body)");
+    checkState(
+        Objects.equals(httpResponse.body().contentType(), MEDIA_TYPE_BINARY),
+        "Wrong HTTP content-type %s",
+        httpResponse.body().contentType());
+    return httpResponse.body().byteStream();
   }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
new file mode 100644
index 0000000..f8a4a8a
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import okhttp3.Call;
+import okhttp3.Callback;
+import okhttp3.Response;
+import okio.Timeout;
+import org.apache.flink.statefun.flink.core.backpressure.BoundedExponentialBackoff;
+import org.apache.flink.util.function.RunnableWithException;
+
+@SuppressWarnings("NullableProblems")
+final class RetryingCallback implements Callback {
+  private static final Duration INITIAL_BACKOFF_DURATION = Duration.ofMillis(10);
+
+  private static final Set<Integer> RETRYABLE_HTTP_CODES =
+      new HashSet<>(Arrays.asList(409, 420, 408, 429, 499, 500));
+
+  private final CompletableFuture<Response> resultFuture;
+  private final BoundedExponentialBackoff backoff;
+
+  RetryingCallback(Timeout timeout) {
+    this.resultFuture = new CompletableFuture<>();
+    this.backoff = new BoundedExponentialBackoff(INITIAL_BACKOFF_DURATION, duration(timeout));
+  }
+
+  CompletableFuture<Response> future() {
+    return resultFuture;
+  }
+
+  @Override
+  public void onFailure(Call call, IOException cause) {
+    tryWithFuture(() -> onFailureUnsafe(call, cause));
+  }
+
+  @Override
+  public void onResponse(Call call, Response response) {
+    tryWithFuture(() -> onResponseUnsafe(call, response));
+  }
+
+  private void onFailureUnsafe(Call call, IOException cause) {
+    if (!retryAfterApplyingBackoff(call)) {
+      throw new IllegalStateException(
+          "Maximal request time has elapsed. Last cause is attached", cause);
+    }
+  }
+
+  private void onResponseUnsafe(Call call, Response response) {
+    if (response.isSuccessful()) {
+      resultFuture.complete(response);
+      return;
+    }
+    if (!RETRYABLE_HTTP_CODES.contains(response.code())) {
+      throw new IllegalStateException("Non successful HTTP response code " + response.code());
+    }
+    if (!retryAfterApplyingBackoff(call)) {
+      throw new IllegalStateException(
+          "Maximal request time has elapsed. Last known error is: invalid HTTP response code "
+              + response.code());
+    }
+  }
+
+  /**
+   * Retires the original call, after applying backoff.
+   *
+   * @return if the request was retried successfully, false otherwise.
+   */
+  private boolean retryAfterApplyingBackoff(Call call) {
+    if (backoff.applyNow()) {
+      call.clone().enqueue(this);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Executes the runnable, and completes {@link #resultFuture} with any exceptions thrown, during
+   * its execution.
+   */
+  private void tryWithFuture(RunnableWithException runnable) {
+    try {
+      runnable.run();
+    } catch (Throwable t) {
+      resultFuture.completeExceptionally(t);
+    }
+  }
+
+  private static Duration duration(Timeout timeout) {
+    return Duration.ofNanos(timeout.timeoutNanos());
+  }
+}