You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/14 17:31:09 UTC
[flink-statefun] 04/11: [FLINK-15956] Add an initial HttpFunction
implemention
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 94def7745a6ee934150db31e19bf7e7665713ba2
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue Feb 11 16:07:18 2020 +0100
[FLINK-15956] Add an initial HttpFunction implemention
---
.../statefun/flink/core/common/PolyglotUtil.java | 53 ++++++
.../statefun/flink/core/httpfn/HttpFunction.java | 192 ++++++++++-----------
.../flink/core/httpfn/HttpFunctionProvider.java | 12 +-
.../statefun/flink/core/httpfn/OkHttpUtils.java | 51 +++++-
.../src/test/resources/bar-module/module.yaml | 2 +
5 files changed, 198 insertions(+), 112 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java
new file mode 100644
index 0000000..c901e77
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.common;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+import java.io.IOException;
+import java.io.InputStream;
+import javax.annotation.Nonnull;
+import org.apache.flink.statefun.flink.core.polyglot.generated.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+
+public final class PolyglotUtil {
+ private PolyglotUtil() {}
+
+ public static <M extends Message> M parseProtobufOrThrow(Parser<M> parser, InputStream input) {
+ try {
+ return parser.parseFrom(input);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to parse a Protobuf message", e);
+ }
+ }
+
+ public static Address sdkAddressToPolyglotAddress(
+ @Nonnull org.apache.flink.statefun.sdk.Address sdkAddress) {
+ return Address.newBuilder()
+ .setNamespace(sdkAddress.type().namespace())
+ .setType(sdkAddress.type().name())
+ .setId(sdkAddress.id())
+ .build();
+ }
+
+ public static org.apache.flink.statefun.sdk.Address polyglotAddressToSdkAddress(Address address) {
+ return new org.apache.flink.statefun.sdk.Address(
+ new FunctionType(address.getNamespace(), address.getType()), address.getId());
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
index 51f0405..506081a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
@@ -18,42 +18,38 @@
package org.apache.flink.statefun.flink.core.httpfn;
-import org.apache.flink.statefun.flink.core.polyglot.generated.Address;
+import static org.apache.flink.statefun.flink.core.common.PolyglotUtil.parseProtobufOrThrow;
+import static org.apache.flink.statefun.flink.core.common.PolyglotUtil.polyglotAddressToSdkAddress;
+import static org.apache.flink.statefun.flink.core.common.PolyglotUtil.sdkAddressToPolyglotAddress;
+import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUtils.MEDIA_TYPE_BINARY;
+import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUtils.responseBody;
+import static org.apache.flink.util.Preconditions.checkState;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse;
import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction;
import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.Invocation;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.Invocation.Builder;
import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest;
+import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import okhttp3.HttpUrl;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Objects;
-
-import javax.annotation.Nonnull;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
final class HttpFunction implements StatefulFunction {
- private static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream");
-
private final HttpFunctionSpec functionSpec;
private final OkHttpClient client;
private final HttpUrl url;
@@ -78,17 +74,18 @@ final class HttpFunction implements StatefulFunction {
@Override
public void invoke(Context context, Object input) {
- if (input instanceof AsyncOperationResult) {
- @SuppressWarnings("unchecked")
- AsyncOperationResult<Any, Response> result = (AsyncOperationResult<Any, Response>) input;
- onAsyncResult(context, result);
- } else {
+ if (!(input instanceof AsyncOperationResult)) {
onRequest(context, (Any) input);
+ return;
}
+ @SuppressWarnings("unchecked")
+ AsyncOperationResult<ToFunction, Response> result =
+ (AsyncOperationResult<ToFunction, Response>) input;
+ onAsyncResult(context, result);
}
- private void onRequest(Context context, Any input) {
- Invocation.Builder invocationBuilder = invocationBuilder(context, input);
+ private void onRequest(Context context, Any message) {
+ Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
addToOrCreateBatch(invocationBuilder);
return;
@@ -97,14 +94,23 @@ final class HttpFunction implements StatefulFunction {
sendToFunction(context, invocationBuilder);
}
- private void onAsyncResult(Context context, AsyncOperationResult<Any, Response> asyncResult) {
+ private void addToOrCreateBatch(Invocation.Builder invocationBuilder) {
+ InvocationBatchRequest current = batch.get();
+ InvocationBatchRequest.Builder next =
+ (current == null) ? InvocationBatchRequest.newBuilder() : current.toBuilder();
+ next.addInvocations(invocationBuilder);
+ batch.set(next.build());
+ }
+
+ private void onAsyncResult(
+ Context context, AsyncOperationResult<ToFunction, Response> asyncResult) {
if (asyncResult.unknown()) {
- Any originalMessage = asyncResult.metadata();
- Invocation.Builder invocationBuilder = invocationBuilder(context, originalMessage);
- sendToFunction(context, invocationBuilder);
+ ToFunction batch = asyncResult.metadata();
+ sendToFunction(context, batch);
return;
}
- InvocationResponse invocationResult = unpackInvocationResultOrThrow(asyncResult);
+ InvocationResponse invocationResult =
+ unpackInvocationResultOrThrow(context.self(), asyncResult);
handleInvocationResponse(context, invocationResult);
InvocationBatchRequest nextBatch = batch.get();
if (nextBatch == null) {
@@ -123,6 +129,27 @@ final class HttpFunction implements StatefulFunction {
context.send(to, message);
}
+ handleStateMutations(invocationResult);
+ }
+
+ // --------------------------------------------------------------------------------
+ // State Management
+ // --------------------------------------------------------------------------------
+
+ private void addStates(ToFunction.InvocationBatchRequest.Builder batchBuilder) {
+ for (String stateName : functionSpec.states()) {
+ ToFunction.PersistedValue.Builder valueBuilder =
+ ToFunction.PersistedValue.newBuilder().setStateName(stateName);
+
+ byte[] stateValue = managedStates.get(stateName);
+ if (stateValue != null) {
+ valueBuilder.setStateValue(ByteString.copyFrom(stateValue));
+ }
+ batchBuilder.addState(valueBuilder);
+ }
+ }
+
+ private void handleStateMutations(InvocationResponse invocationResult) {
for (FromFunction.PersistedValueMutation mutate : invocationResult.getStateMutationsList()) {
final String stateName = mutate.getStateName();
switch (mutate.getMutationType()) {
@@ -140,104 +167,65 @@ final class HttpFunction implements StatefulFunction {
}
}
- private void addToOrCreateBatch(Builder invocationBuilder) {
- batch.updateAndGet(
- existingBatch -> {
- final InvocationBatchRequest.Builder builder =
- existingBatch != null
- ? existingBatch.toBuilder()
- : InvocationBatchRequest.newBuilder();
- return builder.addInvocations(invocationBuilder).build();
- });
- }
-
// --------------------------------------------------------------------------------
- // Utilities
+ // Send Message to Remote Function
// --------------------------------------------------------------------------------
- private static Builder invocationBuilder(Context context, Any input) {
+
+ /**
+ * Returns an {@link Invocation.Builder} set with the input {@code message} and the caller
+ * information (is present).
+ */
+ private static Invocation.Builder singeInvocationBuilder(Context context, Any message) {
Invocation.Builder invocationBuilder = Invocation.newBuilder();
if (context.caller() != null) {
invocationBuilder.setCaller(sdkAddressToPolyglotAddress(context.caller()));
}
- invocationBuilder.setArgument(input);
+ invocationBuilder.setArgument(message);
return invocationBuilder;
}
- private void addStates(ToFunction.InvocationBatchRequest.Builder batchBuilder) {
- for (String stateName : functionSpec.states()) {
- ToFunction.PersistedValue.Builder valueBuilder =
- ToFunction.PersistedValue.newBuilder().setStateName(stateName);
-
- byte[] stateValue = managedStates.get(stateName);
- if (stateValue != null) {
- valueBuilder.setStateValue(ByteString.copyFrom(stateValue));
- }
- batchBuilder.addState(valueBuilder);
- }
- }
-
+ /**
+ * Sends a {@link InvocationBatchRequest} to the remote function consisting out of a single
+ * invocation represented by {@code invocationBuilder}.
+ */
private void sendToFunction(Context context, Invocation.Builder invocationBuilder) {
InvocationBatchRequest.Builder batchBuilder = InvocationBatchRequest.newBuilder();
batchBuilder.addInvocations(invocationBuilder);
sendToFunction(context, batchBuilder);
}
+ /** Sends a {@link InvocationBatchRequest} to the remote function. */
private void sendToFunction(Context context, InvocationBatchRequest.Builder batchBuilder) {
batchBuilder.setTarget(sdkAddressToPolyglotAddress(context.self()));
addStates(batchBuilder);
+ ToFunction toFunction = ToFunction.newBuilder().setInvocation(batchBuilder).build();
+ sendToFunction(context, toFunction);
+ }
+
+ private void sendToFunction(Context context, ToFunction toFunction) {
Request request =
new Request.Builder()
.url(url)
- .post(
- RequestBody.create(
- MEDIA_TYPE_BINARY,
- ToFunction.newBuilder().setInvocation(batchBuilder).build().toByteArray()))
+ .post(RequestBody.create(MEDIA_TYPE_BINARY, toFunction.toByteArray()))
.build();
- OkHttpUtils.call(client, request);
+
+ CompletableFuture<Response> responseFuture = OkHttpUtils.call(client, request);
+ context.registerAsyncOperation(toFunction, responseFuture);
}
- private static InvocationResponse unpackInvocationResultOrThrow(
- AsyncOperationResult<Any, Response> asyncResult) {
- checkState(asyncResult.failure() || asyncResult.successful());
+ private InvocationResponse unpackInvocationResultOrThrow(
+ Address self, AsyncOperationResult<?, Response> asyncResult) {
+ checkState(!asyncResult.unknown());
if (asyncResult.failure()) {
- throw new IllegalStateException("", asyncResult.throwable());
+ throw new IllegalStateException(
+ "Failure forwarding a message to a remote function " + self, asyncResult.throwable());
}
- Response httpResponse = asyncResult.value();
- checkState(httpResponse.isSuccessful(), "Unexpected HTTP status code %s", httpResponse.code());
- checkState(httpResponse.body() != null, "Unexpected empty HTTP response (no body)");
- checkState(
- Objects.equals(httpResponse.body().contentType(), MEDIA_TYPE_BINARY),
- "Wrong HTTP content-type %s",
- httpResponse.body().contentType());
- InputStream httpResponseBody = httpResponse.body().byteStream();
- FromFunction fromFunction = parseProtobufOrThrow(httpResponseBody);
+ InputStream httpResponseBody = responseBody(asyncResult.value());
+ FromFunction fromFunction = parseProtobufOrThrow(FromFunction.parser(), httpResponseBody);
checkState(
fromFunction.hasInvocationResult(),
"The received HTTP payload does not contain an InvocationResult, but rather [%s]",
fromFunction);
return fromFunction.getInvocationResult();
}
-
- private static FromFunction parseProtobufOrThrow(InputStream input) {
- try {
- return FromFunction.parseFrom(input);
- } catch (IOException e) {
- throw new IllegalStateException("Unable to parse a Protobuf message", e);
- }
- }
-
- private static Address sdkAddressToPolyglotAddress(
- @Nonnull org.apache.flink.statefun.sdk.Address sdkAddress) {
- return Address.newBuilder()
- .setNamespace(sdkAddress.type().namespace())
- .setType(sdkAddress.type().name())
- .setId(sdkAddress.id())
- .build();
- }
-
- private static org.apache.flink.statefun.sdk.Address polyglotAddressToSdkAddress(
- Address address) {
- return new org.apache.flink.statefun.sdk.Address(
- new FunctionType(address.getNamespace(), address.getType()), address.getId());
- }
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index a546865..be027e0 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -18,20 +18,22 @@
package org.apache.flink.statefun.flink.core.httpfn;
+import java.util.Map;
+import okhttp3.OkHttpClient;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
-import okhttp3.OkHttpClient;
-
-import java.util.Map;
-
public class HttpFunctionProvider implements StatefulFunctionProvider {
private final Map<FunctionType, HttpFunctionSpec> supportedTypes;
private final OkHttpClient client;
public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) {
this.supportedTypes = supportedTypes;
- this.client = new OkHttpClient();
+ final long timeoutMs = 30_000;
+ // TODO: add various timeouts to HttpFunctionSpec
+ this.client =
+ OkHttpUtils.newClient(
+ timeoutMs, timeoutMs, 2 * timeoutMs, timeoutMs, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
@Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java
index 1839c36..100c932 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java
@@ -16,25 +16,66 @@
package org.apache.flink.statefun.flink.core.httpfn;
+import static org.apache.flink.util.Preconditions.checkState;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import okhttp3.Call;
import okhttp3.Callback;
+import okhttp3.ConnectionPool;
+import okhttp3.Dispatcher;
+import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
-import java.io.IOException;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-
final class OkHttpUtils {
private OkHttpUtils() {}
- public static CompletableFuture<Response> call(OkHttpClient client, Request request) {
+ static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream");
+
+ static CompletableFuture<Response> call(OkHttpClient client, Request request) {
CompletableFuture<Response> future = new CompletableFuture<>();
client.newCall(request).enqueue(new CompletableFutureCallback(future));
return future;
}
+ static InputStream responseBody(Response httpResponse) {
+ checkState(httpResponse.isSuccessful(), "Unexpected HTTP status code %s", httpResponse.code());
+ checkState(httpResponse.body() != null, "Unexpected empty HTTP response (no body)");
+ checkState(
+ Objects.equals(httpResponse.body().contentType(), MEDIA_TYPE_BINARY),
+ "Wrong HTTP content-type %s",
+ httpResponse.body().contentType());
+ return httpResponse.body().byteStream();
+ }
+
+ static OkHttpClient newClient(
+ long readTimeoutMillis,
+ long writeTimeoutMillis,
+ long callTimeout,
+ long connectionTimeInMillis,
+ int maxRequestsPerHost,
+ int maxRequests) {
+ Dispatcher dispatcher = new Dispatcher();
+ dispatcher.setMaxRequestsPerHost(maxRequestsPerHost);
+ dispatcher.setMaxRequests(maxRequests);
+
+ return new OkHttpClient.Builder()
+ .connectTimeout(connectionTimeInMillis, TimeUnit.MILLISECONDS)
+ .writeTimeout(writeTimeoutMillis, TimeUnit.MILLISECONDS)
+ .readTimeout(readTimeoutMillis, TimeUnit.MILLISECONDS)
+ .callTimeout(callTimeout, TimeUnit.MILLISECONDS)
+ .dispatcher(dispatcher)
+ .connectionPool(new ConnectionPool())
+ .followRedirects(true)
+ .followSslRedirects(true)
+ .build();
+ }
+
@SuppressWarnings("NullableProblems")
private static final class CompletableFutureCallback implements Callback {
private final CompletableFuture<Response> future;
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
index c659fe9..2ef6956 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
@@ -31,6 +31,8 @@ module:
type: com.foo/world
spec:
endpoint: localhost:5959/statefun
+ states:
+ - seen_count
routers:
- router:
meta: