You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/28 11:41:44 UTC

[flink-statefun] 01/10: [FLINK-17875] [core] Introduce JsonEntity and implementations

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit ac07854eaad3fbc5eb89502f16c518fbce4aab77
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 28 12:28:12 2020 +0800

    [FLINK-17875] [core] Introduce JsonEntity and implementations
    
    A JsonEntity represents a section within a JsonModule that should be
    parsed into application entity specs (of functions, routers, ingresses,
    egresses, etc.) and bind to the module.
---
 .../flink/core/jsonmodule/EgressJsonEntity.java    |  57 +++++++
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 166 +++++++++++++++++++++
 .../flink/core/jsonmodule/IngressJsonEntity.java   |  68 +++++++++
 .../statefun/flink/core/jsonmodule/JsonEntity.java |  39 +++++
 .../flink/core/jsonmodule/RouterJsonEntity.java    | 106 +++++++++++++
 5 files changed, 436 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
new file mode 100644
index 0000000..766d936
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import static org.apache.flink.statefun.flink.core.jsonmodule.Pointers.EGRESSES_POINTER;
+
+import com.google.protobuf.Any;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
+import org.apache.flink.statefun.sdk.EgressType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+
+final class EgressJsonEntity implements JsonEntity {
+
+  @Override
+  public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+    final Iterable<? extends JsonNode> egressNodes =
+        Selectors.listAt(moduleSpecRootNode, EGRESSES_POINTER);
+
+    egressNodes.forEach(
+        egressNode -> {
+          binder.bindEgress(
+              new JsonEgressSpec<>(egressType(egressNode), egressId(egressNode), egressNode));
+        });
+  }
+
+  private static EgressType egressType(JsonNode spec) {
+    String typeString = Selectors.textAt(spec, Pointers.Egress.META_TYPE);
+    NamespaceNamePair nn = NamespaceNamePair.from(typeString);
+    return new EgressType(nn.namespace(), nn.name());
+  }
+
+  private static EgressIdentifier<Any> egressId(JsonNode spec) {
+    String egressId = Selectors.textAt(spec, Pointers.Egress.META_ID);
+    NamespaceNamePair nn = NamespaceNamePair.from(egressId);
+    return new EgressIdentifier<>(nn.namespace(), nn.name(), Any.class);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
new file mode 100644
index 0000000..25f1981
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.flink.statefun.flink.core.common.Maps.transformValues;
+import static org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.time.Duration;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider;
+import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+import org.apache.flink.util.TimeUtils;
+
+final class FunctionJsonEntity implements JsonEntity {
+
+  private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
+  private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
+
+  @Override
+  public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+    final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode);
+
+    for (Map.Entry<Kind, Map<FunctionType, FunctionSpec>> entry :
+        parse(functionSpecNodes).entrySet()) {
+      StatefulFunctionProvider provider = functionProvider(entry.getKey(), entry.getValue());
+      Set<FunctionType> functionTypes = entry.getValue().keySet();
+      for (FunctionType type : functionTypes) {
+        binder.bindFunctionProvider(type, provider);
+      }
+    }
+  }
+
+  private Map<Kind, Map<FunctionType, FunctionSpec>> parse(
+      Iterable<? extends JsonNode> functionSpecNodes) {
+    return StreamSupport.stream(functionSpecNodes.spliterator(), false)
+        .map(FunctionJsonEntity::parseFunctionSpec)
+        .collect(groupingBy(FunctionSpec::kind, groupByFunctionType()));
+  }
+
+  private static Iterable<? extends JsonNode> functionSpecNodes(JsonNode moduleSpecRootNode) {
+    return Selectors.listAt(moduleSpecRootNode, Pointers.FUNCTIONS_POINTER);
+  }
+
+  private static FunctionSpec parseFunctionSpec(JsonNode functionNode) {
+    String functionKind = Selectors.textAt(functionNode, Pointers.Functions.META_KIND);
+    FunctionSpec.Kind kind =
+        FunctionSpec.Kind.valueOf(functionKind.toUpperCase(Locale.getDefault()));
+    FunctionType functionType = functionType(functionNode);
+    switch (kind) {
+      case HTTP:
+        return new HttpFunctionSpec(
+            functionType,
+            functionUri(functionNode),
+            functionStates(functionNode),
+            maxRequestDuration(functionNode),
+            maxNumBatchRequests(functionNode));
+      case GRPC:
+        return new GrpcFunctionSpec(functionType, functionAddress(functionNode));
+      default:
+        throw new IllegalArgumentException("Unrecognized function kind " + functionKind);
+    }
+  }
+
+  private static List<String> functionStates(JsonNode functionNode) {
+    return Selectors.textListAt(functionNode, Pointers.Functions.FUNCTION_STATES);
+  }
+
+  private static int maxNumBatchRequests(JsonNode functionNode) {
+    return Selectors.optionalIntegerAt(
+            functionNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS)
+        .orElse(DEFAULT_MAX_NUM_BATCH_REQUESTS);
+  }
+
+  private static Duration maxRequestDuration(JsonNode functionNode) {
+    return Selectors.optionalTextAt(functionNode, Pointers.Functions.FUNCTION_TIMEOUT)
+        .map(TimeUtils::parseDuration)
+        .orElse(DEFAULT_HTTP_TIMEOUT);
+  }
+
+  private static FunctionType functionType(JsonNode functionNode) {
+    String namespaceName = Selectors.textAt(functionNode, Pointers.Functions.META_TYPE);
+    NamespaceNamePair nn = NamespaceNamePair.from(namespaceName);
+    return new FunctionType(nn.namespace(), nn.name());
+  }
+
+  private static InetSocketAddress functionAddress(JsonNode functionNode) {
+    String host = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_HOSTNAME);
+    int port = Selectors.integerAt(functionNode, Pointers.Functions.FUNCTION_PORT);
+    return new InetSocketAddress(host, port);
+  }
+
+  private static URI functionUri(JsonNode functionNode) {
+    String uri = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_ENDPOINT);
+    URI typedUri = URI.create(uri);
+    @Nullable String scheme = typedUri.getScheme();
+    if (scheme == null) {
+      throw new IllegalArgumentException(
+          "Missing scheme in function endpoint "
+              + uri
+              + "; an http or https scheme must be provided.");
+    }
+    if (scheme.equalsIgnoreCase("http")
+        || scheme.equalsIgnoreCase("https")
+        || scheme.equalsIgnoreCase("http+unix")
+        || scheme.equalsIgnoreCase("https+unix")) {
+      return typedUri;
+    }
+    throw new IllegalArgumentException(
+        "Missing scheme in function endpoint "
+            + uri
+            + "; an http or https or http+unix or https+unix scheme must be provided.");
+  }
+
+  private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() {
+    return toMap(FunctionSpec::functionType, Function.identity());
+  }
+
+  private static StatefulFunctionProvider functionProvider(
+      Kind kind, Map<FunctionType, FunctionSpec> definedFunctions) {
+    switch (kind) {
+      case HTTP:
+        return new HttpFunctionProvider(
+            transformValues(definedFunctions, HttpFunctionSpec.class::cast));
+      case GRPC:
+        return new GrpcFunctionProvider(
+            transformValues(definedFunctions, GrpcFunctionSpec.class::cast));
+      default:
+        throw new IllegalStateException("Unexpected value: " + kind);
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
new file mode 100644
index 0000000..d30675c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import com.google.protobuf.Message;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
+import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
+import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes;
+import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
+import org.apache.flink.statefun.sdk.IngressType;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+
+final class IngressJsonEntity implements JsonEntity {
+
+  @Override
+  public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+    final Iterable<? extends JsonNode> ingressNodes =
+        Selectors.listAt(moduleSpecRootNode, Pointers.INGRESSES_POINTER);
+
+    ingressNodes.forEach(
+        ingressNode -> {
+          final IngressIdentifier<Message> id = ingressId(ingressNode);
+          final IngressType type = ingressType(ingressNode);
+
+          binder.bindIngress(new JsonIngressSpec<>(type, id, ingressNode));
+          if (isAutoRoutableIngress(type)) {
+            binder.bindIngressRouter(id, new AutoRoutableProtobufRouter());
+          }
+        });
+  }
+
+  private static IngressType ingressType(JsonNode spec) {
+    String typeString = Selectors.textAt(spec, Pointers.Ingress.META_TYPE);
+    NamespaceNamePair nn = NamespaceNamePair.from(typeString);
+    return new IngressType(nn.namespace(), nn.name());
+  }
+
+  private static IngressIdentifier<Message> ingressId(JsonNode ingress) {
+    String ingressId = Selectors.textAt(ingress, Pointers.Ingress.META_ID);
+    NamespaceNamePair nn = NamespaceNamePair.from(ingressId);
+    return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
+  }
+
+  private static boolean isAutoRoutableIngress(IngressType ingressType) {
+    return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)
+        || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java
new file mode 100644
index 0000000..9d40307
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+
+/**
+ * A {@link JsonEntity} represents a section within a {@link JsonModule} that should be parsed into
+ * application entity specs (of functions, routers, ingresses, egresses, etc.) and bind to the
+ * module.
+ */
+interface JsonEntity {
+
+  /**
+   * Parse the module spec node, and bind result specs to the module.
+   *
+   * @param binder used to bind specs to the module.
+   * @param moduleSpecNode the root module spec node.
+   * @param formatVersion the format version of the module spec.
+   */
+  void bind(Binder binder, JsonNode moduleSpecNode, FormatVersion formatVersion);
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
new file mode 100644
index 0000000..ee0ad47
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Optional;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.ResourceLocator;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.common.protobuf.ProtobufDescriptorMap;
+import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.Router;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+
+final class RouterJsonEntity implements JsonEntity {
+
+  @Override
+  public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+    final Iterable<? extends JsonNode> routerNodes =
+        Selectors.listAt(moduleSpecRootNode, Pointers.ROUTERS_POINTER);
+
+    routerNodes.forEach(
+        routerNode -> {
+          // currently the only type of router supported in a module.yaml, is a protobuf
+          // dynamicMessage
+          // router once we will introduce further router types we should refactor this to be more
+          // dynamic.
+          requireProtobufRouterType(routerNode);
+
+          binder.bindIngressRouter(targetRouterIngress(routerNode), dynamicRouter(routerNode));
+        });
+  }
+
+  // ----------------------------------------------------------------------------------------------------------
+  // Routers
+  // ----------------------------------------------------------------------------------------------------------
+
+  private static Router<Message> dynamicRouter(JsonNode router) {
+    String addressTemplate = Selectors.textAt(router, Pointers.Routers.SPEC_TARGET);
+    String descriptorSetPath = Selectors.textAt(router, Pointers.Routers.SPEC_DESCRIPTOR);
+    String messageType = Selectors.textAt(router, Pointers.Routers.SPEC_MESSAGE_TYPE);
+
+    ProtobufDescriptorMap descriptorPath = protobufDescriptorMap(descriptorSetPath);
+    Optional<Descriptors.GenericDescriptor> maybeDescriptor =
+        descriptorPath.getDescriptorByName(messageType);
+    if (!maybeDescriptor.isPresent()) {
+      throw new IllegalStateException(
+          "Error while processing a router definition. Unable to locate a message "
+              + messageType
+              + " in a descriptor set "
+              + descriptorSetPath);
+    }
+    return ProtobufRouter.forAddressTemplate(
+        (Descriptors.Descriptor) maybeDescriptor.get(), addressTemplate);
+  }
+
+  private static ProtobufDescriptorMap protobufDescriptorMap(String descriptorSetPath) {
+    try {
+      URL url = ResourceLocator.findNamedResource(descriptorSetPath);
+      if (url == null) {
+        throw new IllegalArgumentException(
+            "Unable to locate a Protobuf descriptor set at " + descriptorSetPath);
+      }
+      return ProtobufDescriptorMap.from(url);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "Error while processing a router definition. Unable to read the descriptor set at  "
+              + descriptorSetPath,
+          e);
+    }
+  }
+
+  private static IngressIdentifier<Message> targetRouterIngress(JsonNode routerNode) {
+    String targetIngress = Selectors.textAt(routerNode, Pointers.Routers.SPEC_INGRESS);
+    NamespaceNamePair nn = NamespaceNamePair.from(targetIngress);
+    return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
+  }
+
+  private static void requireProtobufRouterType(JsonNode routerNode) {
+    String routerType = Selectors.textAt(routerNode, Pointers.Routers.META_TYPE);
+    if (!routerType.equalsIgnoreCase("org.apache.flink.statefun.sdk/protobuf-router")) {
+      throw new IllegalStateException("Invalid router type " + routerType);
+    }
+  }
+}