You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/14 17:31:11 UTC
[flink-statefun] 06/11: [FLINK-15956] Use ExponentialBackoff during
retires
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 69abd0984f8b3acb11306a38676c346e3a189c60
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Thu Feb 13 14:56:17 2020 +0100
[FLINK-15956] Use ExponentialBackoff during retires
---
.../flink/core/httpfn/HttpFunctionProvider.java | 6 +-
.../statefun/flink/core/httpfn/OkHttpUtils.java | 70 ++++---------
.../flink/core/httpfn/RetryingCallback.java | 113 +++++++++++++++++++++
3 files changed, 136 insertions(+), 53 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index be027e0..3d94109 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -29,11 +29,7 @@ public class HttpFunctionProvider implements StatefulFunctionProvider {
public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) {
this.supportedTypes = supportedTypes;
- final long timeoutMs = 30_000;
- // TODO: add various timeouts to HttpFunctionSpec
- this.client =
- OkHttpUtils.newClient(
- timeoutMs, timeoutMs, 2 * timeoutMs, timeoutMs, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ this.client = OkHttpUtils.newClient();
}
@Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java
index 100c932..c5fb4be 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java
@@ -18,13 +18,11 @@ package org.apache.flink.statefun.flink.core.httpfn;
import static org.apache.flink.util.Preconditions.checkState;
-import java.io.IOException;
import java.io.InputStream;
+import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import okhttp3.Call;
-import okhttp3.Callback;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.MediaType;
@@ -35,63 +33,39 @@ import okhttp3.Response;
final class OkHttpUtils {
private OkHttpUtils() {}
- static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream");
-
- static CompletableFuture<Response> call(OkHttpClient client, Request request) {
- CompletableFuture<Response> future = new CompletableFuture<>();
- client.newCall(request).enqueue(new CompletableFutureCallback(future));
- return future;
- }
+ private static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofMinutes(2);
- static InputStream responseBody(Response httpResponse) {
- checkState(httpResponse.isSuccessful(), "Unexpected HTTP status code %s", httpResponse.code());
- checkState(httpResponse.body() != null, "Unexpected empty HTTP response (no body)");
- checkState(
- Objects.equals(httpResponse.body().contentType(), MEDIA_TYPE_BINARY),
- "Wrong HTTP content-type %s",
- httpResponse.body().contentType());
- return httpResponse.body().byteStream();
- }
+ static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream");
- static OkHttpClient newClient(
- long readTimeoutMillis,
- long writeTimeoutMillis,
- long callTimeout,
- long connectionTimeInMillis,
- int maxRequestsPerHost,
- int maxRequests) {
+ static OkHttpClient newClient() {
Dispatcher dispatcher = new Dispatcher();
- dispatcher.setMaxRequestsPerHost(maxRequestsPerHost);
- dispatcher.setMaxRequests(maxRequests);
+ dispatcher.setMaxRequestsPerHost(Integer.MAX_VALUE);
+ dispatcher.setMaxRequests(Integer.MAX_VALUE);
return new OkHttpClient.Builder()
- .connectTimeout(connectionTimeInMillis, TimeUnit.MILLISECONDS)
- .writeTimeout(writeTimeoutMillis, TimeUnit.MILLISECONDS)
- .readTimeout(readTimeoutMillis, TimeUnit.MILLISECONDS)
- .callTimeout(callTimeout, TimeUnit.MILLISECONDS)
+ .callTimeout(DEFAULT_CALL_TIMEOUT)
.dispatcher(dispatcher)
.connectionPool(new ConnectionPool())
.followRedirects(true)
.followSslRedirects(true)
+ .retryOnConnectionFailure(true)
.build();
}
- @SuppressWarnings("NullableProblems")
- private static final class CompletableFutureCallback implements Callback {
- private final CompletableFuture<Response> future;
-
- public CompletableFutureCallback(CompletableFuture<Response> future) {
- this.future = Objects.requireNonNull(future);
- }
-
- @Override
- public void onFailure(Call call, IOException e) {
- future.completeExceptionally(e);
- }
+ static CompletableFuture<Response> call(OkHttpClient client, Request request) {
+ Call newCall = client.newCall(request);
+ RetryingCallback callback = new RetryingCallback(newCall.timeout());
+ newCall.enqueue(callback);
+ return callback.future();
+ }
- @Override
- public void onResponse(Call call, Response response) {
- future.complete(response);
- }
+ static InputStream responseBody(Response httpResponse) {
+ checkState(httpResponse.isSuccessful(), "Unexpected HTTP status code %s", httpResponse.code());
+ checkState(httpResponse.body() != null, "Unexpected empty HTTP response (no body)");
+ checkState(
+ Objects.equals(httpResponse.body().contentType(), MEDIA_TYPE_BINARY),
+ "Wrong HTTP content-type %s",
+ httpResponse.body().contentType());
+ return httpResponse.body().byteStream();
}
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
new file mode 100644
index 0000000..f8a4a8a
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import okhttp3.Call;
+import okhttp3.Callback;
+import okhttp3.Response;
+import okio.Timeout;
+import org.apache.flink.statefun.flink.core.backpressure.BoundedExponentialBackoff;
+import org.apache.flink.util.function.RunnableWithException;
+
+@SuppressWarnings("NullableProblems")
+final class RetryingCallback implements Callback {
+ private static final Duration INITIAL_BACKOFF_DURATION = Duration.ofMillis(10);
+
+ private static final Set<Integer> RETRYABLE_HTTP_CODES =
+ new HashSet<>(Arrays.asList(409, 420, 408, 429, 499, 500));
+
+ private final CompletableFuture<Response> resultFuture;
+ private final BoundedExponentialBackoff backoff;
+
+ RetryingCallback(Timeout timeout) {
+ this.resultFuture = new CompletableFuture<>();
+ this.backoff = new BoundedExponentialBackoff(INITIAL_BACKOFF_DURATION, duration(timeout));
+ }
+
+ CompletableFuture<Response> future() {
+ return resultFuture;
+ }
+
+ @Override
+ public void onFailure(Call call, IOException cause) {
+ tryWithFuture(() -> onFailureUnsafe(call, cause));
+ }
+
+ @Override
+ public void onResponse(Call call, Response response) {
+ tryWithFuture(() -> onResponseUnsafe(call, response));
+ }
+
+ private void onFailureUnsafe(Call call, IOException cause) {
+ if (!retryAfterApplyingBackoff(call)) {
+ throw new IllegalStateException(
+ "Maximal request time has elapsed. Last cause is attached", cause);
+ }
+ }
+
+ private void onResponseUnsafe(Call call, Response response) {
+ if (response.isSuccessful()) {
+ resultFuture.complete(response);
+ return;
+ }
+ if (!RETRYABLE_HTTP_CODES.contains(response.code())) {
+ throw new IllegalStateException("Non successful HTTP response code " + response.code());
+ }
+ if (!retryAfterApplyingBackoff(call)) {
+ throw new IllegalStateException(
+ "Maximal request time has elapsed. Last known error is: invalid HTTP response code "
+ + response.code());
+ }
+ }
+
+ /**
+ * Retires the original call, after applying backoff.
+ *
+ * @return if the request was retried successfully, false otherwise.
+ */
+ private boolean retryAfterApplyingBackoff(Call call) {
+ if (backoff.applyNow()) {
+ call.clone().enqueue(this);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Executes the runnable, and completes {@link #resultFuture} with any exceptions thrown, during
+ * its execution.
+ */
+ private void tryWithFuture(RunnableWithException runnable) {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ resultFuture.completeExceptionally(t);
+ }
+ }
+
+ private static Duration duration(Timeout timeout) {
+ return Duration.ofNanos(timeout.timeoutNanos());
+ }
+}