You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/28 11:41:44 UTC
[flink-statefun] 01/10: [FLINK-17875] [core] Introduce JsonEntity
and implementations
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit ac07854eaad3fbc5eb89502f16c518fbce4aab77
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 28 12:28:12 2020 +0800
[FLINK-17875] [core] Introduce JsonEntity and implementations
A JsonEntity represents a section within a JsonModule that should be
parsed into application entity specs (of functions, routers, ingresses,
egresses, etc.) and bind to the module.
---
.../flink/core/jsonmodule/EgressJsonEntity.java | 57 +++++++
.../flink/core/jsonmodule/FunctionJsonEntity.java | 166 +++++++++++++++++++++
.../flink/core/jsonmodule/IngressJsonEntity.java | 68 +++++++++
.../statefun/flink/core/jsonmodule/JsonEntity.java | 39 +++++
.../flink/core/jsonmodule/RouterJsonEntity.java | 106 +++++++++++++
5 files changed, 436 insertions(+)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
new file mode 100644
index 0000000..766d936
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import static org.apache.flink.statefun.flink.core.jsonmodule.Pointers.EGRESSES_POINTER;
+
+import com.google.protobuf.Any;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
+import org.apache.flink.statefun.sdk.EgressType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+
+final class EgressJsonEntity implements JsonEntity {
+
+ @Override
+ public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+ final Iterable<? extends JsonNode> egressNodes =
+ Selectors.listAt(moduleSpecRootNode, EGRESSES_POINTER);
+
+ egressNodes.forEach(
+ egressNode -> {
+ binder.bindEgress(
+ new JsonEgressSpec<>(egressType(egressNode), egressId(egressNode), egressNode));
+ });
+ }
+
+ private static EgressType egressType(JsonNode spec) {
+ String typeString = Selectors.textAt(spec, Pointers.Egress.META_TYPE);
+ NamespaceNamePair nn = NamespaceNamePair.from(typeString);
+ return new EgressType(nn.namespace(), nn.name());
+ }
+
+ private static EgressIdentifier<Any> egressId(JsonNode spec) {
+ String egressId = Selectors.textAt(spec, Pointers.Egress.META_ID);
+ NamespaceNamePair nn = NamespaceNamePair.from(egressId);
+ return new EgressIdentifier<>(nn.namespace(), nn.name(), Any.class);
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
new file mode 100644
index 0000000..25f1981
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.flink.statefun.flink.core.common.Maps.transformValues;
+import static org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.time.Duration;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider;
+import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+import org.apache.flink.util.TimeUtils;
+
+final class FunctionJsonEntity implements JsonEntity {
+
+ private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
+ private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
+
+ @Override
+ public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+ final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode);
+
+ for (Map.Entry<Kind, Map<FunctionType, FunctionSpec>> entry :
+ parse(functionSpecNodes).entrySet()) {
+ StatefulFunctionProvider provider = functionProvider(entry.getKey(), entry.getValue());
+ Set<FunctionType> functionTypes = entry.getValue().keySet();
+ for (FunctionType type : functionTypes) {
+ binder.bindFunctionProvider(type, provider);
+ }
+ }
+ }
+
+ private Map<Kind, Map<FunctionType, FunctionSpec>> parse(
+ Iterable<? extends JsonNode> functionSpecNodes) {
+ return StreamSupport.stream(functionSpecNodes.spliterator(), false)
+ .map(FunctionJsonEntity::parseFunctionSpec)
+ .collect(groupingBy(FunctionSpec::kind, groupByFunctionType()));
+ }
+
+ private static Iterable<? extends JsonNode> functionSpecNodes(JsonNode moduleSpecRootNode) {
+ return Selectors.listAt(moduleSpecRootNode, Pointers.FUNCTIONS_POINTER);
+ }
+
+ private static FunctionSpec parseFunctionSpec(JsonNode functionNode) {
+ String functionKind = Selectors.textAt(functionNode, Pointers.Functions.META_KIND);
+ FunctionSpec.Kind kind =
+ FunctionSpec.Kind.valueOf(functionKind.toUpperCase(Locale.getDefault()));
+ FunctionType functionType = functionType(functionNode);
+ switch (kind) {
+ case HTTP:
+ return new HttpFunctionSpec(
+ functionType,
+ functionUri(functionNode),
+ functionStates(functionNode),
+ maxRequestDuration(functionNode),
+ maxNumBatchRequests(functionNode));
+ case GRPC:
+ return new GrpcFunctionSpec(functionType, functionAddress(functionNode));
+ default:
+ throw new IllegalArgumentException("Unrecognized function kind " + functionKind);
+ }
+ }
+
+ private static List<String> functionStates(JsonNode functionNode) {
+ return Selectors.textListAt(functionNode, Pointers.Functions.FUNCTION_STATES);
+ }
+
+ private static int maxNumBatchRequests(JsonNode functionNode) {
+ return Selectors.optionalIntegerAt(
+ functionNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS)
+ .orElse(DEFAULT_MAX_NUM_BATCH_REQUESTS);
+ }
+
+ private static Duration maxRequestDuration(JsonNode functionNode) {
+ return Selectors.optionalTextAt(functionNode, Pointers.Functions.FUNCTION_TIMEOUT)
+ .map(TimeUtils::parseDuration)
+ .orElse(DEFAULT_HTTP_TIMEOUT);
+ }
+
+ private static FunctionType functionType(JsonNode functionNode) {
+ String namespaceName = Selectors.textAt(functionNode, Pointers.Functions.META_TYPE);
+ NamespaceNamePair nn = NamespaceNamePair.from(namespaceName);
+ return new FunctionType(nn.namespace(), nn.name());
+ }
+
+ private static InetSocketAddress functionAddress(JsonNode functionNode) {
+ String host = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_HOSTNAME);
+ int port = Selectors.integerAt(functionNode, Pointers.Functions.FUNCTION_PORT);
+ return new InetSocketAddress(host, port);
+ }
+
+ private static URI functionUri(JsonNode functionNode) {
+ String uri = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_ENDPOINT);
+ URI typedUri = URI.create(uri);
+ @Nullable String scheme = typedUri.getScheme();
+ if (scheme == null) {
+ throw new IllegalArgumentException(
+ "Missing scheme in function endpoint "
+ + uri
+ + "; an http or https scheme must be provided.");
+ }
+ if (scheme.equalsIgnoreCase("http")
+ || scheme.equalsIgnoreCase("https")
+ || scheme.equalsIgnoreCase("http+unix")
+ || scheme.equalsIgnoreCase("https+unix")) {
+ return typedUri;
+ }
+ throw new IllegalArgumentException(
+ "Missing scheme in function endpoint "
+ + uri
+ + "; an http or https or http+unix or https+unix scheme must be provided.");
+ }
+
+ private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() {
+ return toMap(FunctionSpec::functionType, Function.identity());
+ }
+
+ private static StatefulFunctionProvider functionProvider(
+ Kind kind, Map<FunctionType, FunctionSpec> definedFunctions) {
+ switch (kind) {
+ case HTTP:
+ return new HttpFunctionProvider(
+ transformValues(definedFunctions, HttpFunctionSpec.class::cast));
+ case GRPC:
+ return new GrpcFunctionProvider(
+ transformValues(definedFunctions, GrpcFunctionSpec.class::cast));
+ default:
+ throw new IllegalStateException("Unexpected value: " + kind);
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
new file mode 100644
index 0000000..d30675c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import com.google.protobuf.Message;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
+import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
+import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes;
+import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
+import org.apache.flink.statefun.sdk.IngressType;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+
+final class IngressJsonEntity implements JsonEntity {
+
+ @Override
+ public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+ final Iterable<? extends JsonNode> ingressNodes =
+ Selectors.listAt(moduleSpecRootNode, Pointers.INGRESSES_POINTER);
+
+ ingressNodes.forEach(
+ ingressNode -> {
+ final IngressIdentifier<Message> id = ingressId(ingressNode);
+ final IngressType type = ingressType(ingressNode);
+
+ binder.bindIngress(new JsonIngressSpec<>(type, id, ingressNode));
+ if (isAutoRoutableIngress(type)) {
+ binder.bindIngressRouter(id, new AutoRoutableProtobufRouter());
+ }
+ });
+ }
+
+ private static IngressType ingressType(JsonNode spec) {
+ String typeString = Selectors.textAt(spec, Pointers.Ingress.META_TYPE);
+ NamespaceNamePair nn = NamespaceNamePair.from(typeString);
+ return new IngressType(nn.namespace(), nn.name());
+ }
+
+ private static IngressIdentifier<Message> ingressId(JsonNode ingress) {
+ String ingressId = Selectors.textAt(ingress, Pointers.Ingress.META_ID);
+ NamespaceNamePair nn = NamespaceNamePair.from(ingressId);
+ return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
+ }
+
+ private static boolean isAutoRoutableIngress(IngressType ingressType) {
+ return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)
+ || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE);
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java
new file mode 100644
index 0000000..9d40307
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+
+/**
+ * A {@link JsonEntity} represents a section within a {@link JsonModule} that should be parsed into
+ * application entity specs (of functions, routers, ingresses, egresses, etc.) and bind to the
+ * module.
+ */
+interface JsonEntity {
+
+ /**
+ * Parse the module spec node, and bind result specs to the module.
+ *
+ * @param binder used to bind specs to the module.
+ * @param moduleSpecNode the root module spec node.
+ * @param formatVersion the format version of the module spec.
+ */
+ void bind(Binder binder, JsonNode moduleSpecNode, FormatVersion formatVersion);
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
new file mode 100644
index 0000000..ee0ad47
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Optional;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.ResourceLocator;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.common.protobuf.ProtobufDescriptorMap;
+import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.Router;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+
+final class RouterJsonEntity implements JsonEntity {
+
+ @Override
+ public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+ final Iterable<? extends JsonNode> routerNodes =
+ Selectors.listAt(moduleSpecRootNode, Pointers.ROUTERS_POINTER);
+
+ routerNodes.forEach(
+ routerNode -> {
+ // currently the only type of router supported in a module.yaml, is a protobuf
+ // dynamicMessage
+ // router once we will introduce further router types we should refactor this to be more
+ // dynamic.
+ requireProtobufRouterType(routerNode);
+
+ binder.bindIngressRouter(targetRouterIngress(routerNode), dynamicRouter(routerNode));
+ });
+ }
+
+ // ----------------------------------------------------------------------------------------------------------
+ // Routers
+ // ----------------------------------------------------------------------------------------------------------
+
+ private static Router<Message> dynamicRouter(JsonNode router) {
+ String addressTemplate = Selectors.textAt(router, Pointers.Routers.SPEC_TARGET);
+ String descriptorSetPath = Selectors.textAt(router, Pointers.Routers.SPEC_DESCRIPTOR);
+ String messageType = Selectors.textAt(router, Pointers.Routers.SPEC_MESSAGE_TYPE);
+
+ ProtobufDescriptorMap descriptorPath = protobufDescriptorMap(descriptorSetPath);
+ Optional<Descriptors.GenericDescriptor> maybeDescriptor =
+ descriptorPath.getDescriptorByName(messageType);
+ if (!maybeDescriptor.isPresent()) {
+ throw new IllegalStateException(
+ "Error while processing a router definition. Unable to locate a message "
+ + messageType
+ + " in a descriptor set "
+ + descriptorSetPath);
+ }
+ return ProtobufRouter.forAddressTemplate(
+ (Descriptors.Descriptor) maybeDescriptor.get(), addressTemplate);
+ }
+
+ private static ProtobufDescriptorMap protobufDescriptorMap(String descriptorSetPath) {
+ try {
+ URL url = ResourceLocator.findNamedResource(descriptorSetPath);
+ if (url == null) {
+ throw new IllegalArgumentException(
+ "Unable to locate a Protobuf descriptor set at " + descriptorSetPath);
+ }
+ return ProtobufDescriptorMap.from(url);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "Error while processing a router definition. Unable to read the descriptor set at "
+ + descriptorSetPath,
+ e);
+ }
+ }
+
+ private static IngressIdentifier<Message> targetRouterIngress(JsonNode routerNode) {
+ String targetIngress = Selectors.textAt(routerNode, Pointers.Routers.SPEC_INGRESS);
+ NamespaceNamePair nn = NamespaceNamePair.from(targetIngress);
+ return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
+ }
+
+ private static void requireProtobufRouterType(JsonNode routerNode) {
+ String routerType = Selectors.textAt(routerNode, Pointers.Routers.META_TYPE);
+ if (!routerType.equalsIgnoreCase("org.apache.flink.statefun.sdk/protobuf-router")) {
+ throw new IllegalStateException("Invalid router type " + routerType);
+ }
+ }
+}