You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/14 17:31:09 UTC

[flink-statefun] 04/11: [FLINK-15956] Add an initial HttpFunction implemention

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 94def7745a6ee934150db31e19bf7e7665713ba2
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue Feb 11 16:07:18 2020 +0100

    [FLINK-15956] Add an initial HttpFunction implemention
---
 .../statefun/flink/core/common/PolyglotUtil.java   |  53 ++++++
 .../statefun/flink/core/httpfn/HttpFunction.java   | 192 ++++++++++-----------
 .../flink/core/httpfn/HttpFunctionProvider.java    |  12 +-
 .../statefun/flink/core/httpfn/OkHttpUtils.java    |  51 +++++-
 .../src/test/resources/bar-module/module.yaml      |   2 +
 5 files changed, 198 insertions(+), 112 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java
new file mode 100644
index 0000000..c901e77
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.common;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+import java.io.IOException;
+import java.io.InputStream;
+import javax.annotation.Nonnull;
+import org.apache.flink.statefun.flink.core.polyglot.generated.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+
+public final class PolyglotUtil {
+  private PolyglotUtil() {}
+
+  public static <M extends Message> M parseProtobufOrThrow(Parser<M> parser, InputStream input) {
+    try {
+      return parser.parseFrom(input);
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to parse a Protobuf message", e);
+    }
+  }
+
+  public static Address sdkAddressToPolyglotAddress(
+      @Nonnull org.apache.flink.statefun.sdk.Address sdkAddress) {
+    return Address.newBuilder()
+        .setNamespace(sdkAddress.type().namespace())
+        .setType(sdkAddress.type().name())
+        .setId(sdkAddress.id())
+        .build();
+  }
+
+  public static org.apache.flink.statefun.sdk.Address polyglotAddressToSdkAddress(Address address) {
+    return new org.apache.flink.statefun.sdk.Address(
+        new FunctionType(address.getNamespace(), address.getType()), address.getId());
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
index 51f0405..506081a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
@@ -18,42 +18,38 @@
 
 package org.apache.flink.statefun.flink.core.httpfn;
 
-import org.apache.flink.statefun.flink.core.polyglot.generated.Address;
+import static org.apache.flink.statefun.flink.core.common.PolyglotUtil.parseProtobufOrThrow;
+import static org.apache.flink.statefun.flink.core.common.PolyglotUtil.polyglotAddressToSdkAddress;
+import static org.apache.flink.statefun.flink.core.common.PolyglotUtil.sdkAddressToPolyglotAddress;
+import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUtils.MEDIA_TYPE_BINARY;
+import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUtils.responseBody;
+import static org.apache.flink.util.Preconditions.checkState;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse;
 import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction;
 import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.Invocation;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.Invocation.Builder;
 import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest;
+import org.apache.flink.statefun.sdk.Address;
 import org.apache.flink.statefun.sdk.AsyncOperationResult;
 import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunction;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
 import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import okhttp3.HttpUrl;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Objects;
-
-import javax.annotation.Nonnull;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
 final class HttpFunction implements StatefulFunction {
 
-  private static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream");
-
   private final HttpFunctionSpec functionSpec;
   private final OkHttpClient client;
   private final HttpUrl url;
@@ -78,17 +74,18 @@ final class HttpFunction implements StatefulFunction {
 
   @Override
   public void invoke(Context context, Object input) {
-    if (input instanceof AsyncOperationResult) {
-      @SuppressWarnings("unchecked")
-      AsyncOperationResult<Any, Response> result = (AsyncOperationResult<Any, Response>) input;
-      onAsyncResult(context, result);
-    } else {
+    if (!(input instanceof AsyncOperationResult)) {
       onRequest(context, (Any) input);
+      return;
     }
+    @SuppressWarnings("unchecked")
+    AsyncOperationResult<ToFunction, Response> result =
+        (AsyncOperationResult<ToFunction, Response>) input;
+    onAsyncResult(context, result);
   }
 
-  private void onRequest(Context context, Any input) {
-    Invocation.Builder invocationBuilder = invocationBuilder(context, input);
+  private void onRequest(Context context, Any message) {
+    Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
     if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
       addToOrCreateBatch(invocationBuilder);
       return;
@@ -97,14 +94,23 @@ final class HttpFunction implements StatefulFunction {
     sendToFunction(context, invocationBuilder);
   }
 
-  private void onAsyncResult(Context context, AsyncOperationResult<Any, Response> asyncResult) {
+  private void addToOrCreateBatch(Invocation.Builder invocationBuilder) {
+    InvocationBatchRequest current = batch.get();
+    InvocationBatchRequest.Builder next =
+        (current == null) ? InvocationBatchRequest.newBuilder() : current.toBuilder();
+    next.addInvocations(invocationBuilder);
+    batch.set(next.build());
+  }
+
+  private void onAsyncResult(
+      Context context, AsyncOperationResult<ToFunction, Response> asyncResult) {
     if (asyncResult.unknown()) {
-      Any originalMessage = asyncResult.metadata();
-      Invocation.Builder invocationBuilder = invocationBuilder(context, originalMessage);
-      sendToFunction(context, invocationBuilder);
+      ToFunction batch = asyncResult.metadata();
+      sendToFunction(context, batch);
       return;
     }
-    InvocationResponse invocationResult = unpackInvocationResultOrThrow(asyncResult);
+    InvocationResponse invocationResult =
+        unpackInvocationResultOrThrow(context.self(), asyncResult);
     handleInvocationResponse(context, invocationResult);
     InvocationBatchRequest nextBatch = batch.get();
     if (nextBatch == null) {
@@ -123,6 +129,27 @@ final class HttpFunction implements StatefulFunction {
 
       context.send(to, message);
     }
+    handleStateMutations(invocationResult);
+  }
+
+  // --------------------------------------------------------------------------------
+  // State Management
+  // --------------------------------------------------------------------------------
+
+  private void addStates(ToFunction.InvocationBatchRequest.Builder batchBuilder) {
+    for (String stateName : functionSpec.states()) {
+      ToFunction.PersistedValue.Builder valueBuilder =
+          ToFunction.PersistedValue.newBuilder().setStateName(stateName);
+
+      byte[] stateValue = managedStates.get(stateName);
+      if (stateValue != null) {
+        valueBuilder.setStateValue(ByteString.copyFrom(stateValue));
+      }
+      batchBuilder.addState(valueBuilder);
+    }
+  }
+
+  private void handleStateMutations(InvocationResponse invocationResult) {
     for (FromFunction.PersistedValueMutation mutate : invocationResult.getStateMutationsList()) {
       final String stateName = mutate.getStateName();
       switch (mutate.getMutationType()) {
@@ -140,104 +167,65 @@ final class HttpFunction implements StatefulFunction {
     }
   }
 
-  private void addToOrCreateBatch(Builder invocationBuilder) {
-    batch.updateAndGet(
-        existingBatch -> {
-          final InvocationBatchRequest.Builder builder =
-              existingBatch != null
-                  ? existingBatch.toBuilder()
-                  : InvocationBatchRequest.newBuilder();
-          return builder.addInvocations(invocationBuilder).build();
-        });
-  }
-
   // --------------------------------------------------------------------------------
-  // Utilities
+  // Send Message to Remote Function
   // --------------------------------------------------------------------------------
-  private static Builder invocationBuilder(Context context, Any input) {
+
+  /**
+   * Returns an {@link Invocation.Builder} set with the input {@code message} and the caller
+   * information (is present).
+   */
+  private static Invocation.Builder singeInvocationBuilder(Context context, Any message) {
     Invocation.Builder invocationBuilder = Invocation.newBuilder();
     if (context.caller() != null) {
       invocationBuilder.setCaller(sdkAddressToPolyglotAddress(context.caller()));
     }
-    invocationBuilder.setArgument(input);
+    invocationBuilder.setArgument(message);
     return invocationBuilder;
   }
 
-  private void addStates(ToFunction.InvocationBatchRequest.Builder batchBuilder) {
-    for (String stateName : functionSpec.states()) {
-      ToFunction.PersistedValue.Builder valueBuilder =
-          ToFunction.PersistedValue.newBuilder().setStateName(stateName);
-
-      byte[] stateValue = managedStates.get(stateName);
-      if (stateValue != null) {
-        valueBuilder.setStateValue(ByteString.copyFrom(stateValue));
-      }
-      batchBuilder.addState(valueBuilder);
-    }
-  }
-
+  /**
+   * Sends a {@link InvocationBatchRequest} to the remote function consisting out of a single
+   * invocation represented by {@code invocationBuilder}.
+   */
   private void sendToFunction(Context context, Invocation.Builder invocationBuilder) {
     InvocationBatchRequest.Builder batchBuilder = InvocationBatchRequest.newBuilder();
     batchBuilder.addInvocations(invocationBuilder);
     sendToFunction(context, batchBuilder);
   }
 
+  /** Sends a {@link InvocationBatchRequest} to the remote function. */
   private void sendToFunction(Context context, InvocationBatchRequest.Builder batchBuilder) {
     batchBuilder.setTarget(sdkAddressToPolyglotAddress(context.self()));
     addStates(batchBuilder);
+    ToFunction toFunction = ToFunction.newBuilder().setInvocation(batchBuilder).build();
+    sendToFunction(context, toFunction);
+  }
+
+  private void sendToFunction(Context context, ToFunction toFunction) {
     Request request =
         new Request.Builder()
             .url(url)
-            .post(
-                RequestBody.create(
-                    MEDIA_TYPE_BINARY,
-                    ToFunction.newBuilder().setInvocation(batchBuilder).build().toByteArray()))
+            .post(RequestBody.create(MEDIA_TYPE_BINARY, toFunction.toByteArray()))
             .build();
-    OkHttpUtils.call(client, request);
+
+    CompletableFuture<Response> responseFuture = OkHttpUtils.call(client, request);
+    context.registerAsyncOperation(toFunction, responseFuture);
   }
 
-  private static InvocationResponse unpackInvocationResultOrThrow(
-      AsyncOperationResult<Any, Response> asyncResult) {
-    checkState(asyncResult.failure() || asyncResult.successful());
+  private InvocationResponse unpackInvocationResultOrThrow(
+      Address self, AsyncOperationResult<?, Response> asyncResult) {
+    checkState(!asyncResult.unknown());
     if (asyncResult.failure()) {
-      throw new IllegalStateException("", asyncResult.throwable());
+      throw new IllegalStateException(
+          "Failure forwarding a message to a remote function " + self, asyncResult.throwable());
     }
-    Response httpResponse = asyncResult.value();
-    checkState(httpResponse.isSuccessful(), "Unexpected HTTP status code %s", httpResponse.code());
-    checkState(httpResponse.body() != null, "Unexpected empty HTTP response (no body)");
-    checkState(
-        Objects.equals(httpResponse.body().contentType(), MEDIA_TYPE_BINARY),
-        "Wrong HTTP content-type %s",
-        httpResponse.body().contentType());
-    InputStream httpResponseBody = httpResponse.body().byteStream();
-    FromFunction fromFunction = parseProtobufOrThrow(httpResponseBody);
+    InputStream httpResponseBody = responseBody(asyncResult.value());
+    FromFunction fromFunction = parseProtobufOrThrow(FromFunction.parser(), httpResponseBody);
     checkState(
         fromFunction.hasInvocationResult(),
         "The received HTTP payload does not contain an InvocationResult, but rather [%s]",
         fromFunction);
     return fromFunction.getInvocationResult();
   }
-
-  private static FromFunction parseProtobufOrThrow(InputStream input) {
-    try {
-      return FromFunction.parseFrom(input);
-    } catch (IOException e) {
-      throw new IllegalStateException("Unable to parse a Protobuf message", e);
-    }
-  }
-
-  private static Address sdkAddressToPolyglotAddress(
-      @Nonnull org.apache.flink.statefun.sdk.Address sdkAddress) {
-    return Address.newBuilder()
-        .setNamespace(sdkAddress.type().namespace())
-        .setType(sdkAddress.type().name())
-        .setId(sdkAddress.id())
-        .build();
-  }
-
-  private static org.apache.flink.statefun.sdk.Address polyglotAddressToSdkAddress(
-      Address address) {
-    return new org.apache.flink.statefun.sdk.Address(
-        new FunctionType(address.getNamespace(), address.getType()), address.getId());
-  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index a546865..be027e0 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -18,20 +18,22 @@
 
 package org.apache.flink.statefun.flink.core.httpfn;
 
+import java.util.Map;
+import okhttp3.OkHttpClient;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 
-import okhttp3.OkHttpClient;
-
-import java.util.Map;
-
 public class HttpFunctionProvider implements StatefulFunctionProvider {
   private final Map<FunctionType, HttpFunctionSpec> supportedTypes;
   private final OkHttpClient client;
 
   public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) {
     this.supportedTypes = supportedTypes;
-    this.client = new OkHttpClient();
+    final long timeoutMs = 30_000;
+    // TODO: add various timeouts to HttpFunctionSpec
+    this.client =
+        OkHttpUtils.newClient(
+            timeoutMs, timeoutMs, 2 * timeoutMs, timeoutMs, Integer.MAX_VALUE, Integer.MAX_VALUE);
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java
index 1839c36..100c932 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java
@@ -16,25 +16,66 @@
 
 package org.apache.flink.statefun.flink.core.httpfn;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import okhttp3.Call;
 import okhttp3.Callback;
+import okhttp3.ConnectionPool;
+import okhttp3.Dispatcher;
+import okhttp3.MediaType;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
 
-import java.io.IOException;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-
 final class OkHttpUtils {
   private OkHttpUtils() {}
 
-  public static CompletableFuture<Response> call(OkHttpClient client, Request request) {
+  static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream");
+
+  static CompletableFuture<Response> call(OkHttpClient client, Request request) {
     CompletableFuture<Response> future = new CompletableFuture<>();
     client.newCall(request).enqueue(new CompletableFutureCallback(future));
     return future;
   }
 
+  static InputStream responseBody(Response httpResponse) {
+    checkState(httpResponse.isSuccessful(), "Unexpected HTTP status code %s", httpResponse.code());
+    checkState(httpResponse.body() != null, "Unexpected empty HTTP response (no body)");
+    checkState(
+        Objects.equals(httpResponse.body().contentType(), MEDIA_TYPE_BINARY),
+        "Wrong HTTP content-type %s",
+        httpResponse.body().contentType());
+    return httpResponse.body().byteStream();
+  }
+
+  static OkHttpClient newClient(
+      long readTimeoutMillis,
+      long writeTimeoutMillis,
+      long callTimeout,
+      long connectionTimeInMillis,
+      int maxRequestsPerHost,
+      int maxRequests) {
+    Dispatcher dispatcher = new Dispatcher();
+    dispatcher.setMaxRequestsPerHost(maxRequestsPerHost);
+    dispatcher.setMaxRequests(maxRequests);
+
+    return new OkHttpClient.Builder()
+        .connectTimeout(connectionTimeInMillis, TimeUnit.MILLISECONDS)
+        .writeTimeout(writeTimeoutMillis, TimeUnit.MILLISECONDS)
+        .readTimeout(readTimeoutMillis, TimeUnit.MILLISECONDS)
+        .callTimeout(callTimeout, TimeUnit.MILLISECONDS)
+        .dispatcher(dispatcher)
+        .connectionPool(new ConnectionPool())
+        .followRedirects(true)
+        .followSslRedirects(true)
+        .build();
+  }
+
   @SuppressWarnings("NullableProblems")
   private static final class CompletableFutureCallback implements Callback {
     private final CompletableFuture<Response> future;
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
index c659fe9..2ef6956 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
@@ -31,6 +31,8 @@ module:
             type: com.foo/world
           spec:
             endpoint: localhost:5959/statefun
+            states:
+              - seen_count
     routers:
       - router:
           meta: