You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/28 11:41:43 UTC

[flink-statefun] branch master updated (c14d07a -> 4b96b43)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from c14d07a  [FLINK-17963] [core] Revert execution environment patching in StatefulFunctionsJob
     new ac07854  [FLINK-17875] [core] Introduce JsonEntity and implementations
     new 758076b  [FLINK-17875] [core] Wire in JsonEntities into JsonModule
     new 4a920f9  [FLINK-17875] [core] Move default function config values to HttpFunctionSpec
     new 1fa4664  [FLINK-17875] [core] Move JSON Pointers to individual entities
     new 0023360  [FLINK-17875] [core] Introduce StateSpec for remote function specs
     new b852dcc  [FLINK-17875] [core] Wire in StateSpec into RequestReplyFunction
     new 3c8e1f4  [FLINK-17875] [core] Add format version 2.0
     new d8a2210  [FLINK-17875] [core] Update FunctionJsonEntity to recognize V2 state format
     new 44400f9  [FLINK-17875] [core] Parameterize JsonModuleTest to cover all module format versions
     new 4b96b43  [FLINK-17997] [legal] Revert manual merging of AWS KPL's THIRD_PARTY_NOTICE content

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/core/httpfn/HttpFunctionSpec.java        |  52 +++-
 .../StateSpec.java}                                |  27 +-
 .../flink/core/jsonmodule/EgressJsonEntity.java    |  63 +++++
 .../flink/core/jsonmodule/FormatVersion.java       |   5 +-
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 232 ++++++++++++++++
 .../flink/core/jsonmodule/IngressJsonEntity.java   |  76 ++++++
 .../{JsonModuleFactory.java => JsonEntity.java}    |  37 ++-
 .../statefun/flink/core/jsonmodule/JsonModule.java | 294 ++-------------------
 .../flink/core/jsonmodule/JsonServiceLoader.java   |  38 +--
 .../statefun/flink/core/jsonmodule/Pointers.java   |  87 ------
 .../flink/core/jsonmodule/RouterJsonEntity.java    | 120 +++++++++
 .../flink/core/reqreply/RequestReplyFunction.java  |  39 ++-
 .../flink/core/jsonmodule/JsonModuleTest.java      |  38 ++-
 .../core/reqreply/RequestReplyFunctionTest.java    |   3 +-
 .../{bar-module => module-v1_0}/module.yaml        |   0
 .../{bar-module => module-v2_0}/module.yaml        |   7 +-
 statefun-flink/statefun-flink-distribution/pom.xml |  16 --
 .../src/main/resources/META-INF/NOTICE             | 197 --------------
 18 files changed, 674 insertions(+), 657 deletions(-)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/{common/MailboxExecutorFacade.java => httpfn/StateSpec.java} (65%)
 create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
 create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
 create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
 rename statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/{JsonModuleFactory.java => JsonEntity.java} (54%)
 delete mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
 copy statefun-flink/statefun-flink-core/src/test/resources/{bar-module => module-v1_0}/module.yaml (100%)
 rename statefun-flink/statefun-flink-core/src/test/resources/{bar-module => module-v2_0}/module.yaml (95%)


[flink-statefun] 01/10: [FLINK-17875] [core] Introduce JsonEntity and implementations

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit ac07854eaad3fbc5eb89502f16c518fbce4aab77
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 28 12:28:12 2020 +0800

    [FLINK-17875] [core] Introduce JsonEntity and implementations
    
    A JsonEntity represents a section within a JsonModule that should be
    parsed into application entity specs (of functions, routers, ingresses,
    egresses, etc.) and bind to the module.
---
 .../flink/core/jsonmodule/EgressJsonEntity.java    |  57 +++++++
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 166 +++++++++++++++++++++
 .../flink/core/jsonmodule/IngressJsonEntity.java   |  68 +++++++++
 .../statefun/flink/core/jsonmodule/JsonEntity.java |  39 +++++
 .../flink/core/jsonmodule/RouterJsonEntity.java    | 106 +++++++++++++
 5 files changed, 436 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
new file mode 100644
index 0000000..766d936
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import static org.apache.flink.statefun.flink.core.jsonmodule.Pointers.EGRESSES_POINTER;
+
+import com.google.protobuf.Any;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
+import org.apache.flink.statefun.sdk.EgressType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+
+final class EgressJsonEntity implements JsonEntity {
+
+  @Override
+  public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+    final Iterable<? extends JsonNode> egressNodes =
+        Selectors.listAt(moduleSpecRootNode, EGRESSES_POINTER);
+
+    egressNodes.forEach(
+        egressNode -> {
+          binder.bindEgress(
+              new JsonEgressSpec<>(egressType(egressNode), egressId(egressNode), egressNode));
+        });
+  }
+
+  private static EgressType egressType(JsonNode spec) {
+    String typeString = Selectors.textAt(spec, Pointers.Egress.META_TYPE);
+    NamespaceNamePair nn = NamespaceNamePair.from(typeString);
+    return new EgressType(nn.namespace(), nn.name());
+  }
+
+  private static EgressIdentifier<Any> egressId(JsonNode spec) {
+    String egressId = Selectors.textAt(spec, Pointers.Egress.META_ID);
+    NamespaceNamePair nn = NamespaceNamePair.from(egressId);
+    return new EgressIdentifier<>(nn.namespace(), nn.name(), Any.class);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
new file mode 100644
index 0000000..25f1981
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.flink.statefun.flink.core.common.Maps.transformValues;
+import static org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.time.Duration;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider;
+import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+import org.apache.flink.util.TimeUtils;
+
+final class FunctionJsonEntity implements JsonEntity {
+
+  private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
+  private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
+
+  @Override
+  public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+    final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode);
+
+    for (Map.Entry<Kind, Map<FunctionType, FunctionSpec>> entry :
+        parse(functionSpecNodes).entrySet()) {
+      StatefulFunctionProvider provider = functionProvider(entry.getKey(), entry.getValue());
+      Set<FunctionType> functionTypes = entry.getValue().keySet();
+      for (FunctionType type : functionTypes) {
+        binder.bindFunctionProvider(type, provider);
+      }
+    }
+  }
+
+  private Map<Kind, Map<FunctionType, FunctionSpec>> parse(
+      Iterable<? extends JsonNode> functionSpecNodes) {
+    return StreamSupport.stream(functionSpecNodes.spliterator(), false)
+        .map(FunctionJsonEntity::parseFunctionSpec)
+        .collect(groupingBy(FunctionSpec::kind, groupByFunctionType()));
+  }
+
+  private static Iterable<? extends JsonNode> functionSpecNodes(JsonNode moduleSpecRootNode) {
+    return Selectors.listAt(moduleSpecRootNode, Pointers.FUNCTIONS_POINTER);
+  }
+
+  private static FunctionSpec parseFunctionSpec(JsonNode functionNode) {
+    String functionKind = Selectors.textAt(functionNode, Pointers.Functions.META_KIND);
+    FunctionSpec.Kind kind =
+        FunctionSpec.Kind.valueOf(functionKind.toUpperCase(Locale.getDefault()));
+    FunctionType functionType = functionType(functionNode);
+    switch (kind) {
+      case HTTP:
+        return new HttpFunctionSpec(
+            functionType,
+            functionUri(functionNode),
+            functionStates(functionNode),
+            maxRequestDuration(functionNode),
+            maxNumBatchRequests(functionNode));
+      case GRPC:
+        return new GrpcFunctionSpec(functionType, functionAddress(functionNode));
+      default:
+        throw new IllegalArgumentException("Unrecognized function kind " + functionKind);
+    }
+  }
+
+  private static List<String> functionStates(JsonNode functionNode) {
+    return Selectors.textListAt(functionNode, Pointers.Functions.FUNCTION_STATES);
+  }
+
+  private static int maxNumBatchRequests(JsonNode functionNode) {
+    return Selectors.optionalIntegerAt(
+            functionNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS)
+        .orElse(DEFAULT_MAX_NUM_BATCH_REQUESTS);
+  }
+
+  private static Duration maxRequestDuration(JsonNode functionNode) {
+    return Selectors.optionalTextAt(functionNode, Pointers.Functions.FUNCTION_TIMEOUT)
+        .map(TimeUtils::parseDuration)
+        .orElse(DEFAULT_HTTP_TIMEOUT);
+  }
+
+  private static FunctionType functionType(JsonNode functionNode) {
+    String namespaceName = Selectors.textAt(functionNode, Pointers.Functions.META_TYPE);
+    NamespaceNamePair nn = NamespaceNamePair.from(namespaceName);
+    return new FunctionType(nn.namespace(), nn.name());
+  }
+
+  private static InetSocketAddress functionAddress(JsonNode functionNode) {
+    String host = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_HOSTNAME);
+    int port = Selectors.integerAt(functionNode, Pointers.Functions.FUNCTION_PORT);
+    return new InetSocketAddress(host, port);
+  }
+
+  private static URI functionUri(JsonNode functionNode) {
+    String uri = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_ENDPOINT);
+    URI typedUri = URI.create(uri);
+    @Nullable String scheme = typedUri.getScheme();
+    if (scheme == null) {
+      throw new IllegalArgumentException(
+          "Missing scheme in function endpoint "
+              + uri
+              + "; an http or https scheme must be provided.");
+    }
+    if (scheme.equalsIgnoreCase("http")
+        || scheme.equalsIgnoreCase("https")
+        || scheme.equalsIgnoreCase("http+unix")
+        || scheme.equalsIgnoreCase("https+unix")) {
+      return typedUri;
+    }
+    throw new IllegalArgumentException(
+        "Missing scheme in function endpoint "
+            + uri
+            + "; an http or https or http+unix or https+unix scheme must be provided.");
+  }
+
+  private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() {
+    return toMap(FunctionSpec::functionType, Function.identity());
+  }
+
+  private static StatefulFunctionProvider functionProvider(
+      Kind kind, Map<FunctionType, FunctionSpec> definedFunctions) {
+    switch (kind) {
+      case HTTP:
+        return new HttpFunctionProvider(
+            transformValues(definedFunctions, HttpFunctionSpec.class::cast));
+      case GRPC:
+        return new GrpcFunctionProvider(
+            transformValues(definedFunctions, GrpcFunctionSpec.class::cast));
+      default:
+        throw new IllegalStateException("Unexpected value: " + kind);
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
new file mode 100644
index 0000000..d30675c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import com.google.protobuf.Message;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
+import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
+import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes;
+import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
+import org.apache.flink.statefun.sdk.IngressType;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+
+final class IngressJsonEntity implements JsonEntity {
+
+  @Override
+  public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+    final Iterable<? extends JsonNode> ingressNodes =
+        Selectors.listAt(moduleSpecRootNode, Pointers.INGRESSES_POINTER);
+
+    ingressNodes.forEach(
+        ingressNode -> {
+          final IngressIdentifier<Message> id = ingressId(ingressNode);
+          final IngressType type = ingressType(ingressNode);
+
+          binder.bindIngress(new JsonIngressSpec<>(type, id, ingressNode));
+          if (isAutoRoutableIngress(type)) {
+            binder.bindIngressRouter(id, new AutoRoutableProtobufRouter());
+          }
+        });
+  }
+
+  private static IngressType ingressType(JsonNode spec) {
+    String typeString = Selectors.textAt(spec, Pointers.Ingress.META_TYPE);
+    NamespaceNamePair nn = NamespaceNamePair.from(typeString);
+    return new IngressType(nn.namespace(), nn.name());
+  }
+
+  private static IngressIdentifier<Message> ingressId(JsonNode ingress) {
+    String ingressId = Selectors.textAt(ingress, Pointers.Ingress.META_ID);
+    NamespaceNamePair nn = NamespaceNamePair.from(ingressId);
+    return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
+  }
+
+  private static boolean isAutoRoutableIngress(IngressType ingressType) {
+    return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)
+        || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java
new file mode 100644
index 0000000..9d40307
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+
+/**
+ * A {@link JsonEntity} represents a section within a {@link JsonModule} that should be parsed into
+ * application entity specs (of functions, routers, ingresses, egresses, etc.) and bind to the
+ * module.
+ */
+interface JsonEntity {
+
+  /**
+   * Parse the module spec node, and bind result specs to the module.
+   *
+   * @param binder used to bind specs to the module.
+   * @param moduleSpecNode the root module spec node.
+   * @param formatVersion the format version of the module spec.
+   */
+  void bind(Binder binder, JsonNode moduleSpecNode, FormatVersion formatVersion);
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
new file mode 100644
index 0000000..ee0ad47
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Optional;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.ResourceLocator;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.common.protobuf.ProtobufDescriptorMap;
+import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.Router;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+
+final class RouterJsonEntity implements JsonEntity {
+
+  @Override
+  public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+    final Iterable<? extends JsonNode> routerNodes =
+        Selectors.listAt(moduleSpecRootNode, Pointers.ROUTERS_POINTER);
+
+    routerNodes.forEach(
+        routerNode -> {
+          // currently the only type of router supported in a module.yaml, is a protobuf
+          // dynamicMessage
+          // router once we will introduce further router types we should refactor this to be more
+          // dynamic.
+          requireProtobufRouterType(routerNode);
+
+          binder.bindIngressRouter(targetRouterIngress(routerNode), dynamicRouter(routerNode));
+        });
+  }
+
+  // ----------------------------------------------------------------------------------------------------------
+  // Routers
+  // ----------------------------------------------------------------------------------------------------------
+
+  private static Router<Message> dynamicRouter(JsonNode router) {
+    String addressTemplate = Selectors.textAt(router, Pointers.Routers.SPEC_TARGET);
+    String descriptorSetPath = Selectors.textAt(router, Pointers.Routers.SPEC_DESCRIPTOR);
+    String messageType = Selectors.textAt(router, Pointers.Routers.SPEC_MESSAGE_TYPE);
+
+    ProtobufDescriptorMap descriptorPath = protobufDescriptorMap(descriptorSetPath);
+    Optional<Descriptors.GenericDescriptor> maybeDescriptor =
+        descriptorPath.getDescriptorByName(messageType);
+    if (!maybeDescriptor.isPresent()) {
+      throw new IllegalStateException(
+          "Error while processing a router definition. Unable to locate a message "
+              + messageType
+              + " in a descriptor set "
+              + descriptorSetPath);
+    }
+    return ProtobufRouter.forAddressTemplate(
+        (Descriptors.Descriptor) maybeDescriptor.get(), addressTemplate);
+  }
+
+  private static ProtobufDescriptorMap protobufDescriptorMap(String descriptorSetPath) {
+    try {
+      URL url = ResourceLocator.findNamedResource(descriptorSetPath);
+      if (url == null) {
+        throw new IllegalArgumentException(
+            "Unable to locate a Protobuf descriptor set at " + descriptorSetPath);
+      }
+      return ProtobufDescriptorMap.from(url);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "Error while processing a router definition. Unable to read the descriptor set at  "
+              + descriptorSetPath,
+          e);
+    }
+  }
+
+  private static IngressIdentifier<Message> targetRouterIngress(JsonNode routerNode) {
+    String targetIngress = Selectors.textAt(routerNode, Pointers.Routers.SPEC_INGRESS);
+    NamespaceNamePair nn = NamespaceNamePair.from(targetIngress);
+    return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
+  }
+
+  private static void requireProtobufRouterType(JsonNode routerNode) {
+    String routerType = Selectors.textAt(routerNode, Pointers.Routers.META_TYPE);
+    if (!routerType.equalsIgnoreCase("org.apache.flink.statefun.sdk/protobuf-router")) {
+      throw new IllegalStateException("Invalid router type " + routerType);
+    }
+  }
+}


[flink-statefun] 09/10: [FLINK-17875] [core] Parameterize JsonModuleTest to cover all module format versions

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 44400f97cbc30f040994af2fcda460b88576aaea
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 27 13:39:08 2020 +0800

    [FLINK-17875] [core] Parameterize JsonModuleTest to cover all module format versions
    
    This closes #119.
    This closes #120.
---
 .../flink/core/jsonmodule/JsonModuleTest.java      | 28 ++++++++++++++++++----
 .../{bar-module => module-v1_0}/module.yaml        |  0
 .../{bar-module => module-v2_0}/module.yaml        |  7 +++---
 3 files changed, 27 insertions(+), 8 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
index 28ee1fb..44b5bbe 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.assertThat;
 import com.google.protobuf.Any;
 import com.google.protobuf.Message;
 import java.net.URL;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
@@ -32,19 +34,35 @@ import org.apache.flink.statefun.sdk.io.EgressIdentifier;
 import org.apache.flink.statefun.sdk.io.IngressIdentifier;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class JsonModuleTest {
 
+  @Parameterized.Parameters(name = "Format version = {0}, module path = \"{1}\"")
+  public static Collection<?> modules() {
+    return Arrays.asList(
+        new Object[] {FormatVersion.v1_0, "module-v1_0/module.yaml"},
+        new Object[] {FormatVersion.v2_0, "module-v2_0/module.yaml"});
+  }
+
+  private final String modulePath;
+
+  public JsonModuleTest(FormatVersion ignored, String modulePath) {
+    this.modulePath = modulePath;
+  }
+
   @Test
   public void exampleUsage() {
-    StatefulFunctionModule module = fromPath("bar-module/module.yaml");
+    StatefulFunctionModule module = fromPath(modulePath);
 
     assertThat(module, notNullValue());
   }
 
   @Test
   public void testFunctions() {
-    StatefulFunctionModule module = fromPath("bar-module/module.yaml");
+    StatefulFunctionModule module = fromPath(modulePath);
 
     StatefulFunctionsUniverse universe = emptyUniverse();
     module.configure(Collections.emptyMap(), universe);
@@ -59,7 +77,7 @@ public class JsonModuleTest {
 
   @Test
   public void testRouters() {
-    StatefulFunctionModule module = fromPath("bar-module/module.yaml");
+    StatefulFunctionModule module = fromPath(modulePath);
 
     StatefulFunctionsUniverse universe = emptyUniverse();
     module.configure(Collections.emptyMap(), universe);
@@ -71,7 +89,7 @@ public class JsonModuleTest {
 
   @Test
   public void testIngresses() {
-    StatefulFunctionModule module = fromPath("bar-module/module.yaml");
+    StatefulFunctionModule module = fromPath(modulePath);
 
     StatefulFunctionsUniverse universe = emptyUniverse();
     module.configure(Collections.emptyMap(), universe);
@@ -83,7 +101,7 @@ public class JsonModuleTest {
 
   @Test
   public void testEgresses() {
-    StatefulFunctionModule module = fromPath("bar-module/module.yaml");
+    StatefulFunctionModule module = fromPath(modulePath);
 
     StatefulFunctionsUniverse universe = emptyUniverse();
     module.configure(Collections.emptyMap(), universe);
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/module-v1_0/module.yaml
similarity index 100%
copy from statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
copy to statefun-flink/statefun-flink-core/src/test/resources/module-v1_0/module.yaml
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
similarity index 95%
rename from statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
rename to statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
index 0afafc6..46ce6c1 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-version: "1.0"
+version: "2.0"
 
 module:
   meta:
@@ -34,7 +34,8 @@ module:
           spec:
             endpoint: http://localhost:5959/statefun
             states:
-              - seen_count
+              - name: seen_count
+                expireAfter: 60000millisecond
             maxNumBatchRequests: 10000
       - function:
           meta:
@@ -43,7 +44,7 @@ module:
           spec:
             endpoint: http+unix:///hello/world.sock/statefun
             states:
-              - seen_count
+              - name: seen_count
             maxNumBatchRequests: 10000
     routers:
       - router:


[flink-statefun] 06/10: [FLINK-17875] [core] Wire in StateSpec into RequestReplyFunction

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit b852dcc1e02844a7fe123a61f84c37bc4619ed89
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 27 12:18:42 2020 +0800

    [FLINK-17875] [core] Wire in StateSpec into RequestReplyFunction
---
 .../flink/core/reqreply/RequestReplyFunction.java  | 39 +++++++++++++++++-----
 .../core/reqreply/RequestReplyFunctionTest.java    |  3 +-
 2 files changed, 32 insertions(+), 10 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index 6687ed0..8cde450 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
+import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse;
@@ -40,6 +41,7 @@ import org.apache.flink.statefun.sdk.Context;
 import org.apache.flink.statefun.sdk.StatefulFunction;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.state.Expiration;
 import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
 import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
@@ -47,7 +49,7 @@ import org.apache.flink.statefun.sdk.state.PersistedValue;
 public final class RequestReplyFunction implements StatefulFunction {
 
   private final RequestReplyClient client;
-  private final List<String> registeredStateNames;
+  private final List<StateSpec> registeredStates;
   private final int maxNumBatchRequests;
 
   /**
@@ -69,15 +71,17 @@ public final class RequestReplyFunction implements StatefulFunction {
   private final PersistedAppendingBuffer<ToFunction.Invocation> batch =
       PersistedAppendingBuffer.of("batch", ToFunction.Invocation.class);
 
-  @Persisted
-  private final PersistedTable<String, byte[]> managedStates =
-      PersistedTable.of("states", String.class, byte[].class);
+  @Persisted private final PersistedTable<String, byte[]> managedStates;
 
   public RequestReplyFunction(
-      List<String> registeredStateNames, int maxNumBatchRequests, RequestReplyClient client) {
+      List<StateSpec> registeredStates, int maxNumBatchRequests, RequestReplyClient client) {
     this.client = Objects.requireNonNull(client);
-    this.registeredStateNames = Objects.requireNonNull(registeredStateNames);
+    this.registeredStates = Objects.requireNonNull(registeredStates);
     this.maxNumBatchRequests = maxNumBatchRequests;
+
+    this.managedStates =
+        PersistedTable.of(
+            "states", String.class, byte[].class, resolveStateTtlExpiration(registeredStates));
   }
 
   @Override
@@ -92,6 +96,23 @@ public final class RequestReplyFunction implements StatefulFunction {
     onAsyncResult(context, result);
   }
 
+  private static Expiration resolveStateTtlExpiration(List<StateSpec> stateSpecs) {
+    // TODO applying the below limitations due to state multiplexing (see FLINK-17954)
+    // TODO 1) use the max TTL duration across all state, 2) only allow AFTER_READ_AND_WRITE
+
+    Duration maxDuration = Duration.ZERO;
+    for (StateSpec stateSpec : stateSpecs) {
+      if (stateSpec.ttlDuration().compareTo(maxDuration) > 0) {
+        maxDuration = stateSpec.ttlDuration();
+      }
+    }
+
+    if (maxDuration.equals(Duration.ZERO)) {
+      return Expiration.none();
+    }
+    return Expiration.expireAfterReadingOrWriting(maxDuration);
+  }
+
   private void onRequest(Context context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
     int inflightOrBatched = requestState.getOrDefault(-1);
@@ -207,11 +228,11 @@ public final class RequestReplyFunction implements StatefulFunction {
   // --------------------------------------------------------------------------------
 
   private void addStates(ToFunction.InvocationBatchRequest.Builder batchBuilder) {
-    for (String stateName : registeredStateNames) {
+    for (StateSpec stateSpec : registeredStates) {
       ToFunction.PersistedValue.Builder valueBuilder =
-          ToFunction.PersistedValue.newBuilder().setStateName(stateName);
+          ToFunction.PersistedValue.newBuilder().setStateName(stateSpec.name());
 
-      byte[] stateValue = managedStates.get(stateName);
+      byte[] stateValue = managedStates.get(stateSpec.name());
       if (stateValue != null) {
         valueBuilder.setStateValue(ByteString.copyFrom(stateValue));
       }
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index 04d389c..e934399 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import org.apache.flink.statefun.flink.core.TestUtils;
 import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
+import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.DelayedInvocation;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage;
@@ -58,7 +59,7 @@ public class RequestReplyFunctionTest {
 
   private final FakeClient client = new FakeClient();
   private final FakeContext context = new FakeContext();
-  private final List<String> states = Collections.singletonList("session");
+  private final List<StateSpec> states = Collections.singletonList(new StateSpec("session"));
 
   private final RequestReplyFunction functionUnderTest =
       new RequestReplyFunction(states, 10, client);


[flink-statefun] 08/10: [FLINK-17875] [core] Update FunctionJsonEntity to recognize V2 state format

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit d8a221078e88fe082a411f19ff7ad3f66e028324
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 28 15:20:47 2020 +0800

    [FLINK-17875] [core] Update FunctionJsonEntity to recognize V2 state format
---
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 62 +++++++++++++++++++---
 1 file changed, 55 insertions(+), 7 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
index 9b11f00..78bc18d 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -26,6 +26,7 @@ import static org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -34,6 +35,7 @@ import java.util.OptionalInt;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collector;
+import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import javax.annotation.Nullable;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
@@ -44,6 +46,7 @@ import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider;
 import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
+import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
@@ -68,12 +71,17 @@ final class FunctionJsonEntity implements JsonEntity {
         JsonPointer.compile("/function/spec/maxNumBatchRequests");
   }
 
+  private static final class StateSpecPointers {
+    private static final JsonPointer NAME = JsonPointer.compile("/name");
+    private static final JsonPointer EXPIRE_DURATION = JsonPointer.compile("/expireAfter");
+  }
+
   @Override
   public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode);
 
     for (Map.Entry<Kind, Map<FunctionType, FunctionSpec>> entry :
-        parse(functionSpecNodes).entrySet()) {
+        parse(functionSpecNodes, formatVersion).entrySet()) {
       StatefulFunctionProvider provider = functionProvider(entry.getKey(), entry.getValue());
       Set<FunctionType> functionTypes = entry.getValue().keySet();
       for (FunctionType type : functionTypes) {
@@ -83,9 +91,9 @@ final class FunctionJsonEntity implements JsonEntity {
   }
 
   private Map<Kind, Map<FunctionType, FunctionSpec>> parse(
-      Iterable<? extends JsonNode> functionSpecNodes) {
+      Iterable<? extends JsonNode> functionSpecNodes, FormatVersion formatVersion) {
     return StreamSupport.stream(functionSpecNodes.spliterator(), false)
-        .map(FunctionJsonEntity::parseFunctionSpec)
+        .map(functionSpecNode -> parseFunctionSpec(functionSpecNode, formatVersion))
         .collect(groupingBy(FunctionSpec::kind, groupByFunctionType()));
   }
 
@@ -93,7 +101,8 @@ final class FunctionJsonEntity implements JsonEntity {
     return Selectors.listAt(moduleSpecRootNode, FUNCTION_SPECS_POINTER);
   }
 
-  private static FunctionSpec parseFunctionSpec(JsonNode functionNode) {
+  private static FunctionSpec parseFunctionSpec(
+      JsonNode functionNode, FormatVersion formatVersion) {
     String functionKind = Selectors.textAt(functionNode, MetaPointers.KIND);
     FunctionSpec.Kind kind =
         FunctionSpec.Kind.valueOf(functionKind.toUpperCase(Locale.getDefault()));
@@ -103,7 +112,9 @@ final class FunctionJsonEntity implements JsonEntity {
         final HttpFunctionSpec.Builder specBuilder =
             HttpFunctionSpec.builder(functionType, functionUri(functionNode));
 
-        for (String state : functionStates(functionNode)) {
+        final Function<JsonNode, List<StateSpec>> stateSpecParser =
+            functionStateParserOf(formatVersion);
+        for (StateSpec state : stateSpecParser.apply(functionNode)) {
           specBuilder.withState(state);
         }
         optionalMaxNumBatchRequests(functionNode).ifPresent(specBuilder::withMaxNumBatchRequests);
@@ -117,8 +128,40 @@ final class FunctionJsonEntity implements JsonEntity {
     }
   }
 
-  private static List<String> functionStates(JsonNode functionNode) {
-    return Selectors.textListAt(functionNode, SpecPointers.STATES);
+  private static Function<JsonNode, List<StateSpec>> functionStateParserOf(
+      FormatVersion formatVersion) {
+    switch (formatVersion) {
+      case v1_0:
+        return FunctionJsonEntity::functionStateSpecParserV1;
+      case v2_0:
+        return FunctionJsonEntity::functionStateSpecParserV2;
+      default:
+        throw new IllegalStateException("Unrecognized format version: " + formatVersion);
+    }
+  }
+
+  private static List<StateSpec> functionStateSpecParserV1(JsonNode functionNode) {
+    final List<String> stateNames = Selectors.textListAt(functionNode, SpecPointers.STATES);
+    return stateNames.stream().map(StateSpec::new).collect(Collectors.toList());
+  }
+
+  private static List<StateSpec> functionStateSpecParserV2(JsonNode functionNode) {
+    final Iterable<? extends JsonNode> stateSpecNodes =
+        Selectors.listAt(functionNode, SpecPointers.STATES);
+    final List<StateSpec> stateSpecs = new ArrayList<>();
+
+    stateSpecNodes.forEach(
+        stateSpecNode -> {
+          final String name = Selectors.textAt(stateSpecNode, StateSpecPointers.NAME);
+          final Optional<Duration> optionalStateExpireDuration =
+              optionalStateExpireDuration(stateSpecNode);
+          if (optionalStateExpireDuration.isPresent()) {
+            stateSpecs.add(new StateSpec(name, optionalStateExpireDuration.get()));
+          } else {
+            stateSpecs.add(new StateSpec(name));
+          }
+        });
+    return stateSpecs;
   }
 
   private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) {
@@ -130,6 +173,11 @@ final class FunctionJsonEntity implements JsonEntity {
         .map(TimeUtils::parseDuration);
   }
 
+  private static Optional<Duration> optionalStateExpireDuration(JsonNode stateSpecNode) {
+    return Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_DURATION)
+        .map(TimeUtils::parseDuration);
+  }
+
   private static FunctionType functionType(JsonNode functionNode) {
     String namespaceName = Selectors.textAt(functionNode, MetaPointers.TYPE);
     NamespaceNamePair nn = NamespaceNamePair.from(namespaceName);


[flink-statefun] 05/10: [FLINK-17875] [core] Introduce StateSpec for remote function specs

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 00233601a0d6933c71b65eba828562f75dd0c7d0
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 27 12:05:38 2020 +0800

    [FLINK-17875] [core] Introduce StateSpec for remote function specs
    
    The StateSpec class represents the state name and ttl duration as
    specified in YAML remote modules.
---
 .../flink/core/httpfn/HttpFunctionSpec.java        | 12 +++---
 .../statefun/flink/core/httpfn/StateSpec.java      | 44 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 6 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
index 0e03591..868e542 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
@@ -32,14 +32,14 @@ public final class HttpFunctionSpec implements FunctionSpec {
 
   private final FunctionType functionType;
   private final URI endpoint;
-  private final List<String> states;
+  private final List<StateSpec> states;
   private final Duration maxRequestDuration;
   private final int maxNumBatchRequests;
 
   private HttpFunctionSpec(
       FunctionType functionType,
       URI endpoint,
-      List<String> states,
+      List<StateSpec> states,
       Duration maxRequestDuration,
       int maxNumBatchRequests) {
     this.functionType = Objects.requireNonNull(functionType);
@@ -72,7 +72,7 @@ public final class HttpFunctionSpec implements FunctionSpec {
     return "http+unix".equalsIgnoreCase(scheme) || "https+unix".equalsIgnoreCase(scheme);
   }
 
-  public List<String> states() {
+  public List<StateSpec> states() {
     return states;
   }
 
@@ -89,7 +89,7 @@ public final class HttpFunctionSpec implements FunctionSpec {
     private final FunctionType functionType;
     private final URI endpoint;
 
-    private final List<String> states = new ArrayList<>();
+    private final List<StateSpec> states = new ArrayList<>();
     private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT;
     private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS;
 
@@ -98,8 +98,8 @@ public final class HttpFunctionSpec implements FunctionSpec {
       this.endpoint = Objects.requireNonNull(endpoint);
     }
 
-    public Builder withState(String stateName) {
-      this.states.add(stateName);
+    public Builder withState(StateSpec stateSpec) {
+      this.states.add(stateSpec);
       return this;
     }
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
new file mode 100644
index 0000000..7748fec
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import java.time.Duration;
+import java.util.Objects;
+
+public final class StateSpec {
+  private final String name;
+  private final Duration ttlDuration;
+
+  public StateSpec(String name) {
+    this(name, Duration.ZERO);
+  }
+
+  public StateSpec(String name, Duration ttlDuration) {
+    this.name = Objects.requireNonNull(name);
+    this.ttlDuration = Objects.requireNonNull(ttlDuration);
+  }
+
+  public String name() {
+    return name;
+  }
+
+  public Duration ttlDuration() {
+    return ttlDuration;
+  }
+}


[flink-statefun] 04/10: [FLINK-17875] [core] Move JSON Pointers to individual entities

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 1fa4664cdc29c93d7ec4b30f37de712d18505140
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 28 14:36:53 2020 +0800

    [FLINK-17875] [core] Move JSON Pointers to individual entities
    
    Now with the refactoring and the responsibility of parsing each entity
    resides in the individual JsonEntity implementation, it also makes sense
    to not keep all JSON Pointers in a single file, but instead move them
    closer to the JsonEntities that actually use them.
---
 .../flink/core/jsonmodule/EgressJsonEntity.java    | 16 ++--
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 37 ++++++---
 .../flink/core/jsonmodule/IngressJsonEntity.java   | 14 +++-
 .../flink/core/jsonmodule/JsonServiceLoader.java   | 11 ++-
 .../statefun/flink/core/jsonmodule/Pointers.java   | 87 ----------------------
 .../flink/core/jsonmodule/RouterJsonEntity.java    | 26 +++++--
 6 files changed, 77 insertions(+), 114 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
index 766d936..813b740 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.statefun.flink.core.jsonmodule;
 
-import static org.apache.flink.statefun.flink.core.jsonmodule.Pointers.EGRESSES_POINTER;
-
 import com.google.protobuf.Any;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
 import org.apache.flink.statefun.flink.common.json.Selectors;
@@ -31,10 +30,17 @@ import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
 
 final class EgressJsonEntity implements JsonEntity {
 
+  private static final JsonPointer EGRESS_SPECS_POINTER = JsonPointer.compile("/egresses");
+
+  private static final class MetaPointers {
+    private static final JsonPointer ID = JsonPointer.compile("/egress/meta/id");
+    private static final JsonPointer TYPE = JsonPointer.compile("/egress/meta/type");
+  }
+
   @Override
   public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> egressNodes =
-        Selectors.listAt(moduleSpecRootNode, EGRESSES_POINTER);
+        Selectors.listAt(moduleSpecRootNode, EGRESS_SPECS_POINTER);
 
     egressNodes.forEach(
         egressNode -> {
@@ -44,13 +50,13 @@ final class EgressJsonEntity implements JsonEntity {
   }
 
   private static EgressType egressType(JsonNode spec) {
-    String typeString = Selectors.textAt(spec, Pointers.Egress.META_TYPE);
+    String typeString = Selectors.textAt(spec, MetaPointers.TYPE);
     NamespaceNamePair nn = NamespaceNamePair.from(typeString);
     return new EgressType(nn.namespace(), nn.name());
   }
 
   private static EgressIdentifier<Any> egressId(JsonNode spec) {
-    String egressId = Selectors.textAt(spec, Pointers.Egress.META_ID);
+    String egressId = Selectors.textAt(spec, MetaPointers.ID);
     NamespaceNamePair nn = NamespaceNamePair.from(egressId);
     return new EgressIdentifier<>(nn.namespace(), nn.name(), Any.class);
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
index 7dc5173..9b11f00 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -36,6 +36,7 @@ import java.util.function.Function;
 import java.util.stream.Collector;
 import java.util.stream.StreamSupport;
 import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
 import org.apache.flink.statefun.flink.common.json.Selectors;
@@ -50,6 +51,23 @@ import org.apache.flink.util.TimeUtils;
 
 final class FunctionJsonEntity implements JsonEntity {
 
+  private static final JsonPointer FUNCTION_SPECS_POINTER = JsonPointer.compile("/functions");
+
+  private static final class MetaPointers {
+    private static final JsonPointer KIND = JsonPointer.compile("/function/meta/kind");
+    private static final JsonPointer TYPE = JsonPointer.compile("/function/meta/type");
+  }
+
+  private static final class SpecPointers {
+    private static final JsonPointer HOSTNAME = JsonPointer.compile("/function/spec/host");
+    private static final JsonPointer ENDPOINT = JsonPointer.compile("/function/spec/endpoint");
+    private static final JsonPointer PORT = JsonPointer.compile("/function/spec/port");
+    private static final JsonPointer STATES = JsonPointer.compile("/function/spec/states");
+    private static final JsonPointer TIMEOUT = JsonPointer.compile("/function/spec/timeout");
+    private static final JsonPointer MAX_NUM_BATCH_REQUESTS =
+        JsonPointer.compile("/function/spec/maxNumBatchRequests");
+  }
+
   @Override
   public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode);
@@ -72,11 +90,11 @@ final class FunctionJsonEntity implements JsonEntity {
   }
 
   private static Iterable<? extends JsonNode> functionSpecNodes(JsonNode moduleSpecRootNode) {
-    return Selectors.listAt(moduleSpecRootNode, Pointers.FUNCTIONS_POINTER);
+    return Selectors.listAt(moduleSpecRootNode, FUNCTION_SPECS_POINTER);
   }
 
   private static FunctionSpec parseFunctionSpec(JsonNode functionNode) {
-    String functionKind = Selectors.textAt(functionNode, Pointers.Functions.META_KIND);
+    String functionKind = Selectors.textAt(functionNode, MetaPointers.KIND);
     FunctionSpec.Kind kind =
         FunctionSpec.Kind.valueOf(functionKind.toUpperCase(Locale.getDefault()));
     FunctionType functionType = functionType(functionNode);
@@ -100,33 +118,32 @@ final class FunctionJsonEntity implements JsonEntity {
   }
 
   private static List<String> functionStates(JsonNode functionNode) {
-    return Selectors.textListAt(functionNode, Pointers.Functions.FUNCTION_STATES);
+    return Selectors.textListAt(functionNode, SpecPointers.STATES);
   }
 
   private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) {
-    return Selectors.optionalIntegerAt(
-        functionNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS);
+    return Selectors.optionalIntegerAt(functionNode, SpecPointers.MAX_NUM_BATCH_REQUESTS);
   }
 
   private static Optional<Duration> optionalMaxRequestDuration(JsonNode functionNode) {
-    return Selectors.optionalTextAt(functionNode, Pointers.Functions.FUNCTION_TIMEOUT)
+    return Selectors.optionalTextAt(functionNode, SpecPointers.TIMEOUT)
         .map(TimeUtils::parseDuration);
   }
 
   private static FunctionType functionType(JsonNode functionNode) {
-    String namespaceName = Selectors.textAt(functionNode, Pointers.Functions.META_TYPE);
+    String namespaceName = Selectors.textAt(functionNode, MetaPointers.TYPE);
     NamespaceNamePair nn = NamespaceNamePair.from(namespaceName);
     return new FunctionType(nn.namespace(), nn.name());
   }
 
   private static InetSocketAddress functionAddress(JsonNode functionNode) {
-    String host = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_HOSTNAME);
-    int port = Selectors.integerAt(functionNode, Pointers.Functions.FUNCTION_PORT);
+    String host = Selectors.textAt(functionNode, SpecPointers.HOSTNAME);
+    int port = Selectors.integerAt(functionNode, SpecPointers.PORT);
     return new InetSocketAddress(host, port);
   }
 
   private static URI functionUri(JsonNode functionNode) {
-    String uri = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_ENDPOINT);
+    String uri = Selectors.textAt(functionNode, SpecPointers.ENDPOINT);
     URI typedUri = URI.create(uri);
     @Nullable String scheme = typedUri.getScheme();
     if (scheme == null) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
index d30675c..83c5d00 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
@@ -19,6 +19,7 @@
 package org.apache.flink.statefun.flink.core.jsonmodule;
 
 import com.google.protobuf.Message;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
 import org.apache.flink.statefun.flink.common.json.Selectors;
@@ -32,10 +33,17 @@ import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
 
 final class IngressJsonEntity implements JsonEntity {
 
+  private static final JsonPointer INGRESS_SPECS_POINTER = JsonPointer.compile("/ingresses");
+
+  private static final class MetaPointers {
+    private static final JsonPointer ID = JsonPointer.compile("/ingress/meta/id");
+    private static final JsonPointer TYPE = JsonPointer.compile("/ingress/meta/type");
+  }
+
   @Override
   public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> ingressNodes =
-        Selectors.listAt(moduleSpecRootNode, Pointers.INGRESSES_POINTER);
+        Selectors.listAt(moduleSpecRootNode, INGRESS_SPECS_POINTER);
 
     ingressNodes.forEach(
         ingressNode -> {
@@ -50,13 +58,13 @@ final class IngressJsonEntity implements JsonEntity {
   }
 
   private static IngressType ingressType(JsonNode spec) {
-    String typeString = Selectors.textAt(spec, Pointers.Ingress.META_TYPE);
+    String typeString = Selectors.textAt(spec, MetaPointers.TYPE);
     NamespaceNamePair nn = NamespaceNamePair.from(typeString);
     return new IngressType(nn.namespace(), nn.name());
   }
 
   private static IngressIdentifier<Message> ingressId(JsonNode ingress) {
-    String ingressId = Selectors.textAt(ingress, Pointers.Ingress.META_ID);
+    String ingressId = Selectors.textAt(ingress, MetaPointers.ID);
     NamespaceNamePair nn = NamespaceNamePair.from(ingressId);
     return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
index 84e3eba..a507540 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
@@ -22,6 +22,7 @@ import java.net.URL;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -32,6 +33,10 @@ import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 
 public final class JsonServiceLoader {
 
+  private static final JsonPointer FORMAT_VERSION = JsonPointer.compile("/version");
+  private static final JsonPointer MODULE_META_TYPE = JsonPointer.compile("/module/meta/type");
+  private static final JsonPointer MODULE_SPEC = JsonPointer.compile("/module/spec");
+
   public static Iterable<StatefulFunctionModule> load() {
     ObjectMapper mapper = mapper();
 
@@ -68,7 +73,7 @@ public final class JsonServiceLoader {
   }
 
   private static void validateMeta(URL moduleYamlFile, JsonNode root) {
-    JsonNode typeNode = root.at(Pointers.MODULE_META_TYPE);
+    JsonNode typeNode = root.at(MODULE_META_TYPE);
     if (typeNode.isMissingNode()) {
       throw new IllegalStateException("Unable to find a module type in " + moduleYamlFile);
     }
@@ -82,7 +87,7 @@ public final class JsonServiceLoader {
   }
 
   private static JsonNode requireValidModuleSpecNode(URL moduleYamlFile, JsonNode root) {
-    final JsonNode moduleSpecNode = root.at(Pointers.MODULE_SPEC);
+    final JsonNode moduleSpecNode = root.at(MODULE_SPEC);
 
     if (moduleSpecNode.isMissingNode()) {
       throw new IllegalStateException("A module without a spec at " + moduleYamlFile);
@@ -92,7 +97,7 @@ public final class JsonServiceLoader {
   }
 
   private static FormatVersion requireValidFormatVersion(JsonNode root) {
-    final String formatVersion = Selectors.textAt(root, Pointers.FORMAT_VERSION);
+    final String formatVersion = Selectors.textAt(root, FORMAT_VERSION);
     return FormatVersion.fromString(formatVersion);
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
deleted file mode 100644
index 5c5b973..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.flink.core.jsonmodule;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
-
-public final class Pointers {
-
-  private Pointers() {}
-
-  public static final JsonPointer FORMAT_VERSION = JsonPointer.compile("/version");
-
-  // -------------------------------------------------------------------------------------
-  // top level spec definition
-  // -------------------------------------------------------------------------------------
-
-  public static final JsonPointer MODULE_META_TYPE = JsonPointer.compile("/module/meta/type");
-  public static final JsonPointer MODULE_SPEC = JsonPointer.compile("/module/spec");
-  public static final JsonPointer FUNCTIONS_POINTER = JsonPointer.compile("/functions");
-  public static final JsonPointer ROUTERS_POINTER = JsonPointer.compile("/routers");
-  public static final JsonPointer INGRESSES_POINTER = JsonPointer.compile("/ingresses");
-  public static final JsonPointer EGRESSES_POINTER = JsonPointer.compile("/egresses");
-
-  // -------------------------------------------------------------------------------------
-  // function
-  // -------------------------------------------------------------------------------------
-
-  public static final class Functions {
-    public static final JsonPointer META_KIND = JsonPointer.compile("/function/meta/kind");
-    public static final JsonPointer META_TYPE = JsonPointer.compile("/function/meta/type");
-    public static final JsonPointer FUNCTION_HOSTNAME = JsonPointer.compile("/function/spec/host");
-    public static final JsonPointer FUNCTION_ENDPOINT =
-        JsonPointer.compile("/function/spec/endpoint");
-    public static final JsonPointer FUNCTION_PORT = JsonPointer.compile("/function/spec/port");
-    public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
-    public static final JsonPointer FUNCTION_TIMEOUT =
-        JsonPointer.compile("/function/spec/timeout");
-    public static final JsonPointer FUNCTION_MAX_NUM_BATCH_REQUESTS =
-        JsonPointer.compile("/function/spec/maxNumBatchRequests");
-  }
-
-  // -------------------------------------------------------------------------------------
-  // routers
-  // -------------------------------------------------------------------------------------
-
-  public static final class Routers {
-
-    public static final JsonPointer META_TYPE = JsonPointer.compile("/router/meta/type");
-    public static final JsonPointer SPEC_INGRESS = JsonPointer.compile("/router/spec/ingress");
-    public static final JsonPointer SPEC_TARGET = JsonPointer.compile("/router/spec/target");
-    public static final JsonPointer SPEC_DESCRIPTOR =
-        JsonPointer.compile("/router/spec/descriptorSet");
-    public static final JsonPointer SPEC_MESSAGE_TYPE =
-        JsonPointer.compile("/router/spec/messageType");
-  }
-
-  // -------------------------------------------------------------------------------------
-  // ingresses
-  // -------------------------------------------------------------------------------------
-
-  public static final class Ingress {
-
-    public static final JsonPointer META_ID = JsonPointer.compile("/ingress/meta/id");
-    public static final JsonPointer META_TYPE = JsonPointer.compile("/ingress/meta/type");
-  }
-
-  public static final class Egress {
-
-    public static final JsonPointer META_ID = JsonPointer.compile("/egress/meta/id");
-    public static final JsonPointer META_TYPE = JsonPointer.compile("/egress/meta/type");
-  }
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
index ee0ad47..30c3ff2 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
@@ -23,6 +23,7 @@ import com.google.protobuf.Message;
 import java.io.IOException;
 import java.net.URL;
 import java.util.Optional;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.statefun.flink.common.ResourceLocator;
 import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
@@ -35,10 +36,23 @@ import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
 
 final class RouterJsonEntity implements JsonEntity {
 
+  private static final JsonPointer ROUTER_SPECS_POINTER = JsonPointer.compile("/routers");
+
+  private static final class MetaPointers {
+    private static final JsonPointer TYPE = JsonPointer.compile("/router/meta/type");
+  }
+
+  private static final class SpecPointers {
+    private static final JsonPointer INGRESS = JsonPointer.compile("/router/spec/ingress");
+    private static final JsonPointer TARGET = JsonPointer.compile("/router/spec/target");
+    private static final JsonPointer DESCRIPTOR = JsonPointer.compile("/router/spec/descriptorSet");
+    private static final JsonPointer MESSAGE_TYPE = JsonPointer.compile("/router/spec/messageType");
+  }
+
   @Override
   public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> routerNodes =
-        Selectors.listAt(moduleSpecRootNode, Pointers.ROUTERS_POINTER);
+        Selectors.listAt(moduleSpecRootNode, ROUTER_SPECS_POINTER);
 
     routerNodes.forEach(
         routerNode -> {
@@ -57,9 +71,9 @@ final class RouterJsonEntity implements JsonEntity {
   // ----------------------------------------------------------------------------------------------------------
 
   private static Router<Message> dynamicRouter(JsonNode router) {
-    String addressTemplate = Selectors.textAt(router, Pointers.Routers.SPEC_TARGET);
-    String descriptorSetPath = Selectors.textAt(router, Pointers.Routers.SPEC_DESCRIPTOR);
-    String messageType = Selectors.textAt(router, Pointers.Routers.SPEC_MESSAGE_TYPE);
+    String addressTemplate = Selectors.textAt(router, SpecPointers.TARGET);
+    String descriptorSetPath = Selectors.textAt(router, SpecPointers.DESCRIPTOR);
+    String messageType = Selectors.textAt(router, SpecPointers.MESSAGE_TYPE);
 
     ProtobufDescriptorMap descriptorPath = protobufDescriptorMap(descriptorSetPath);
     Optional<Descriptors.GenericDescriptor> maybeDescriptor =
@@ -92,13 +106,13 @@ final class RouterJsonEntity implements JsonEntity {
   }
 
   private static IngressIdentifier<Message> targetRouterIngress(JsonNode routerNode) {
-    String targetIngress = Selectors.textAt(routerNode, Pointers.Routers.SPEC_INGRESS);
+    String targetIngress = Selectors.textAt(routerNode, SpecPointers.INGRESS);
     NamespaceNamePair nn = NamespaceNamePair.from(targetIngress);
     return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
   }
 
   private static void requireProtobufRouterType(JsonNode routerNode) {
-    String routerType = Selectors.textAt(routerNode, Pointers.Routers.META_TYPE);
+    String routerType = Selectors.textAt(routerNode, MetaPointers.TYPE);
     if (!routerType.equalsIgnoreCase("org.apache.flink.statefun.sdk/protobuf-router")) {
       throw new IllegalStateException("Invalid router type " + routerType);
     }


[flink-statefun] 03/10: [FLINK-17875] [core] Move default function config values to HttpFunctionSpec

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 4a920f913eb652183021be727203033529570d90
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 28 14:27:52 2020 +0800

    [FLINK-17875] [core] Move default function config values to HttpFunctionSpec
---
 .../flink/core/httpfn/HttpFunctionSpec.java        | 46 +++++++++++++++++++++-
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 31 ++++++++-------
 2 files changed, 61 insertions(+), 16 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
index 61945f4..0e03591 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
@@ -19,19 +19,24 @@ package org.apache.flink.statefun.flink.core.httpfn;
 
 import java.net.URI;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
 
 public final class HttpFunctionSpec implements FunctionSpec {
+
+  private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
+  private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
+
   private final FunctionType functionType;
   private final URI endpoint;
   private final List<String> states;
   private final Duration maxRequestDuration;
   private final int maxNumBatchRequests;
 
-  public HttpFunctionSpec(
+  private HttpFunctionSpec(
       FunctionType functionType,
       URI endpoint,
       List<String> states,
@@ -44,6 +49,10 @@ public final class HttpFunctionSpec implements FunctionSpec {
     this.maxNumBatchRequests = maxNumBatchRequests;
   }
 
+  public static Builder builder(FunctionType functionType, URI endpoint) {
+    return new Builder(functionType, endpoint);
+  }
+
   @Override
   public FunctionType functionType() {
     return functionType;
@@ -74,4 +83,39 @@ public final class HttpFunctionSpec implements FunctionSpec {
   public int maxNumBatchRequests() {
     return maxNumBatchRequests;
   }
+
+  public static final class Builder {
+
+    private final FunctionType functionType;
+    private final URI endpoint;
+
+    private final List<String> states = new ArrayList<>();
+    private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT;
+    private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS;
+
+    private Builder(FunctionType functionType, URI endpoint) {
+      this.functionType = Objects.requireNonNull(functionType);
+      this.endpoint = Objects.requireNonNull(endpoint);
+    }
+
+    public Builder withState(String stateName) {
+      this.states.add(stateName);
+      return this;
+    }
+
+    public Builder withMaxRequestDuration(Duration duration) {
+      this.maxRequestDuration = Objects.requireNonNull(duration);
+      return this;
+    }
+
+    public Builder withMaxNumBatchRequests(int maxNumBatchRequests) {
+      this.maxNumBatchRequests = maxNumBatchRequests;
+      return this;
+    }
+
+    public HttpFunctionSpec build() {
+      return new HttpFunctionSpec(
+          functionType, endpoint, states, maxRequestDuration, maxNumBatchRequests);
+    }
+  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
index 25f1981..7dc5173 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -29,6 +29,8 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collector;
@@ -48,9 +50,6 @@ import org.apache.flink.util.TimeUtils;
 
 final class FunctionJsonEntity implements JsonEntity {
 
-  private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
-  private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
-
   @Override
   public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode);
@@ -83,12 +82,16 @@ final class FunctionJsonEntity implements JsonEntity {
     FunctionType functionType = functionType(functionNode);
     switch (kind) {
       case HTTP:
-        return new HttpFunctionSpec(
-            functionType,
-            functionUri(functionNode),
-            functionStates(functionNode),
-            maxRequestDuration(functionNode),
-            maxNumBatchRequests(functionNode));
+        final HttpFunctionSpec.Builder specBuilder =
+            HttpFunctionSpec.builder(functionType, functionUri(functionNode));
+
+        for (String state : functionStates(functionNode)) {
+          specBuilder.withState(state);
+        }
+        optionalMaxNumBatchRequests(functionNode).ifPresent(specBuilder::withMaxNumBatchRequests);
+        optionalMaxRequestDuration(functionNode).ifPresent(specBuilder::withMaxRequestDuration);
+
+        return specBuilder.build();
       case GRPC:
         return new GrpcFunctionSpec(functionType, functionAddress(functionNode));
       default:
@@ -100,16 +103,14 @@ final class FunctionJsonEntity implements JsonEntity {
     return Selectors.textListAt(functionNode, Pointers.Functions.FUNCTION_STATES);
   }
 
-  private static int maxNumBatchRequests(JsonNode functionNode) {
+  private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) {
     return Selectors.optionalIntegerAt(
-            functionNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS)
-        .orElse(DEFAULT_MAX_NUM_BATCH_REQUESTS);
+        functionNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS);
   }
 
-  private static Duration maxRequestDuration(JsonNode functionNode) {
+  private static Optional<Duration> optionalMaxRequestDuration(JsonNode functionNode) {
     return Selectors.optionalTextAt(functionNode, Pointers.Functions.FUNCTION_TIMEOUT)
-        .map(TimeUtils::parseDuration)
-        .orElse(DEFAULT_HTTP_TIMEOUT);
+        .map(TimeUtils::parseDuration);
   }
 
   private static FunctionType functionType(JsonNode functionNode) {


[flink-statefun] 02/10: [FLINK-17875] [core] Wire in JsonEntities into JsonModule

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 758076b0033bca61185911557041614d7bae0ada
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 28 14:13:17 2020 +0800

    [FLINK-17875] [core] Wire in JsonEntities into JsonModule
---
 .../statefun/flink/core/jsonmodule/JsonModule.java | 294 ++-------------------
 .../flink/core/jsonmodule/JsonModuleFactory.java   |  46 ----
 .../flink/core/jsonmodule/JsonServiceLoader.java   |  31 ++-
 .../flink/core/jsonmodule/JsonModuleTest.java      |  10 +-
 4 files changed, 34 insertions(+), 347 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index bd96211..cedeb97 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -18,303 +18,41 @@
 package org.apache.flink.statefun.flink.core.jsonmodule;
 
 import static java.lang.String.format;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.toMap;
-import static org.apache.flink.statefun.flink.core.common.Maps.transformValues;
 
-import com.google.protobuf.Any;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
 import java.net.URL;
-import java.time.Duration;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collector;
-import java.util.stream.StreamSupport;
-import javax.annotation.Nullable;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.statefun.flink.common.ResourceLocator;
-import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
-import org.apache.flink.statefun.flink.common.json.Selectors;
-import org.apache.flink.statefun.flink.common.protobuf.ProtobufDescriptorMap;
-import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider;
-import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
-import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
-import org.apache.flink.statefun.flink.core.jsonmodule.Pointers.Functions;
-import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
-import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
-import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
-import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes;
-import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
-import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
-import org.apache.flink.statefun.sdk.EgressType;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.IngressType;
-import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
-import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-import org.apache.flink.statefun.sdk.io.IngressIdentifier;
-import org.apache.flink.statefun.sdk.io.Router;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-import org.apache.flink.util.TimeUtils;
 
 final class JsonModule implements StatefulFunctionModule {
-  private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
-  private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
-  private final JsonNode spec;
+
+  /** Entities in the JSON moduleSpecNode that should be parsed and bound to the module. */
+  private static final List<JsonEntity> ENTITIES =
+      Arrays.asList(
+          new FunctionJsonEntity(),
+          new IngressJsonEntity(),
+          new RouterJsonEntity(),
+          new EgressJsonEntity());
+
+  private final JsonNode moduleSpecNode;
+  private final FormatVersion formatVersion;
   private final URL moduleUrl;
 
-  public JsonModule(JsonNode spec, URL moduleUrl) {
-    this.spec = Objects.requireNonNull(spec);
+  public JsonModule(JsonNode moduleSpecNode, FormatVersion formatVersion, URL moduleUrl) {
+    this.moduleSpecNode = Objects.requireNonNull(moduleSpecNode);
+    this.formatVersion = Objects.requireNonNull(formatVersion);
     this.moduleUrl = Objects.requireNonNull(moduleUrl);
   }
 
   public void configure(Map<String, String> conf, Binder binder) {
     try {
-      configureFunctions(binder, Selectors.listAt(spec, Pointers.FUNCTIONS_POINTER));
-      configureRouters(binder, Selectors.listAt(spec, Pointers.ROUTERS_POINTER));
-      configureIngress(binder, Selectors.listAt(spec, Pointers.INGRESSES_POINTER));
-      configureEgress(binder, Selectors.listAt(spec, Pointers.EGRESSES_POINTER));
+      ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion));
     } catch (Throwable t) {
       throw new ModuleConfigurationException(
           format("Error while parsing module at %s", moduleUrl), t);
     }
   }
-
-  private void configureFunctions(Binder binder, Iterable<? extends JsonNode> functions) {
-    final Map<Kind, Map<FunctionType, FunctionSpec>> definedFunctions =
-        StreamSupport.stream(functions.spliterator(), false)
-            .map(JsonModule::parseFunctionSpec)
-            .collect(groupingBy(FunctionSpec::kind, groupByFunctionType()));
-
-    for (Entry<Kind, Map<FunctionType, FunctionSpec>> entry : definedFunctions.entrySet()) {
-      StatefulFunctionProvider provider = functionProvider(entry.getKey(), entry.getValue());
-      Set<FunctionType> functionTypes = entry.getValue().keySet();
-      for (FunctionType type : functionTypes) {
-        binder.bindFunctionProvider(type, provider);
-      }
-    }
-  }
-
-  private static StatefulFunctionProvider functionProvider(
-      Kind kind, Map<FunctionType, FunctionSpec> definedFunctions) {
-    switch (kind) {
-      case HTTP:
-        return new HttpFunctionProvider(
-            transformValues(definedFunctions, HttpFunctionSpec.class::cast));
-      case GRPC:
-        return new GrpcFunctionProvider(
-            transformValues(definedFunctions, GrpcFunctionSpec.class::cast));
-      default:
-        throw new IllegalStateException("Unexpected value: " + kind);
-    }
-  }
-
-  private void configureRouters(Binder binder, Iterable<? extends JsonNode> routerNodes) {
-    for (JsonNode router : routerNodes) {
-      // currently the only type of router supported in a module.yaml, is a protobuf dynamicMessage
-      // router once we will introduce further router types we should refactor this to be more
-      // dynamic.
-      requireProtobufRouterType(router);
-
-      IngressIdentifier<Message> id = targetRouterIngress(router);
-      binder.bindIngressRouter(id, dynamicRouter(router));
-    }
-  }
-
-  private void configureIngress(Binder binder, Iterable<? extends JsonNode> ingressNode) {
-    for (JsonNode ingress : ingressNode) {
-      IngressIdentifier<Message> id = ingressId(ingress);
-      IngressType type = ingressType(ingress);
-
-      JsonIngressSpec<Message> ingressSpec = new JsonIngressSpec<>(type, id, ingress);
-      binder.bindIngress(ingressSpec);
-
-      if (isAutoRoutableIngress(type)) {
-        binder.bindIngressRouter(id, new AutoRoutableProtobufRouter());
-      }
-    }
-  }
-
-  private void configureEgress(Binder binder, Iterable<? extends JsonNode> egressNode) {
-    for (JsonNode egress : egressNode) {
-      EgressIdentifier<Any> id = egressId(egress);
-      EgressType type = egressType(egress);
-
-      JsonEgressSpec<Any> egressSpec = new JsonEgressSpec<>(type, id, egress);
-      binder.bindEgress(egressSpec);
-    }
-  }
-
-  // ----------------------------------------------------------------------------------------------------------
-  // Ingresses
-  // ----------------------------------------------------------------------------------------------------------
-  private static IngressType ingressType(JsonNode spec) {
-    String typeString = Selectors.textAt(spec, Pointers.Ingress.META_TYPE);
-    NamespaceNamePair nn = NamespaceNamePair.from(typeString);
-    return new IngressType(nn.namespace(), nn.name());
-  }
-
-  private static IngressIdentifier<Message> ingressId(JsonNode ingress) {
-    String ingressId = Selectors.textAt(ingress, Pointers.Ingress.META_ID);
-    NamespaceNamePair nn = NamespaceNamePair.from(ingressId);
-    return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
-  }
-
-  private static boolean isAutoRoutableIngress(IngressType ingressType) {
-    return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)
-        || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE);
-  }
-
-  // ----------------------------------------------------------------------------------------------------------
-  // Egresses
-  // ----------------------------------------------------------------------------------------------------------
-  private static EgressType egressType(JsonNode spec) {
-    String typeString = Selectors.textAt(spec, Pointers.Egress.META_TYPE);
-    NamespaceNamePair nn = NamespaceNamePair.from(typeString);
-    return new EgressType(nn.namespace(), nn.name());
-  }
-
-  private static EgressIdentifier<Any> egressId(JsonNode spec) {
-    String egressId = Selectors.textAt(spec, Pointers.Egress.META_ID);
-    NamespaceNamePair nn = NamespaceNamePair.from(egressId);
-    return new EgressIdentifier<>(nn.namespace(), nn.name(), Any.class);
-  }
-
-  // ----------------------------------------------------------------------------------------------------------
-  // Routers
-  // ----------------------------------------------------------------------------------------------------------
-  private static Router<Message> dynamicRouter(JsonNode router) {
-    String addressTemplate = Selectors.textAt(router, Pointers.Routers.SPEC_TARGET);
-    String descriptorSetPath = Selectors.textAt(router, Pointers.Routers.SPEC_DESCRIPTOR);
-    String messageType = Selectors.textAt(router, Pointers.Routers.SPEC_MESSAGE_TYPE);
-
-    ProtobufDescriptorMap descriptorPath = protobufDescriptorMap(descriptorSetPath);
-    Optional<Descriptors.GenericDescriptor> maybeDescriptor =
-        descriptorPath.getDescriptorByName(messageType);
-    if (!maybeDescriptor.isPresent()) {
-      throw new IllegalStateException(
-          "Error while processing a router definition. Unable to locate a message "
-              + messageType
-              + " in a descriptor set "
-              + descriptorSetPath);
-    }
-    return ProtobufRouter.forAddressTemplate(
-        (Descriptors.Descriptor) maybeDescriptor.get(), addressTemplate);
-  }
-
-  private static ProtobufDescriptorMap protobufDescriptorMap(String descriptorSetPath) {
-    try {
-      URL url = ResourceLocator.findNamedResource(descriptorSetPath);
-      if (url == null) {
-        throw new IllegalArgumentException(
-            "Unable to locate a Protobuf descriptor set at " + descriptorSetPath);
-      }
-      return ProtobufDescriptorMap.from(url);
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "Error while processing a router definition. Unable to read the descriptor set at  "
-              + descriptorSetPath,
-          e);
-    }
-  }
-
-  private static IngressIdentifier<Message> targetRouterIngress(JsonNode routerNode) {
-    String targetIngress = Selectors.textAt(routerNode, Pointers.Routers.SPEC_INGRESS);
-    NamespaceNamePair nn = NamespaceNamePair.from(targetIngress);
-    return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
-  }
-
-  private static void requireProtobufRouterType(JsonNode routerNode) {
-    String routerType = Selectors.textAt(routerNode, Pointers.Routers.META_TYPE);
-    if (!routerType.equalsIgnoreCase("org.apache.flink.statefun.sdk/protobuf-router")) {
-      throw new IllegalStateException("Invalid router type " + routerType);
-    }
-  }
-
-  // ----------------------------------------------------------------------------------------------------------
-  // Functions
-  // ----------------------------------------------------------------------------------------------------------
-
-  private static FunctionSpec parseFunctionSpec(JsonNode functionNode) {
-    String functionKind = Selectors.textAt(functionNode, Pointers.Functions.META_KIND);
-    FunctionSpec.Kind kind = Kind.valueOf(functionKind.toUpperCase(Locale.getDefault()));
-    FunctionType functionType = functionType(functionNode);
-    switch (kind) {
-      case HTTP:
-        return new HttpFunctionSpec(
-            functionType,
-            functionUri(functionNode),
-            functionStates(functionNode),
-            maxRequestDuration(functionNode),
-            maxNumBatchRequests(functionNode));
-      case GRPC:
-        return new GrpcFunctionSpec(functionType, functionAddress(functionNode));
-      default:
-        throw new IllegalArgumentException("Unrecognized function kind " + functionKind);
-    }
-  }
-
-  private static int maxNumBatchRequests(JsonNode functionNode) {
-    return Selectors.optionalIntegerAt(functionNode, Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS)
-        .orElse(DEFAULT_MAX_NUM_BATCH_REQUESTS);
-  }
-
-  private static Duration maxRequestDuration(JsonNode functionNode) {
-    return Selectors.optionalTextAt(functionNode, Functions.FUNCTION_TIMEOUT)
-        .map(TimeUtils::parseDuration)
-        .orElse(DEFAULT_HTTP_TIMEOUT);
-  }
-
-  private static List<String> functionStates(JsonNode functionNode) {
-    return Selectors.textListAt(functionNode, Pointers.Functions.FUNCTION_STATES);
-  }
-
-  private static FunctionType functionType(JsonNode functionNode) {
-    String namespaceName = Selectors.textAt(functionNode, Pointers.Functions.META_TYPE);
-    NamespaceNamePair nn = NamespaceNamePair.from(namespaceName);
-    return new FunctionType(nn.namespace(), nn.name());
-  }
-
-  private static InetSocketAddress functionAddress(JsonNode functionNode) {
-    String host = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_HOSTNAME);
-    int port = Selectors.integerAt(functionNode, Pointers.Functions.FUNCTION_PORT);
-    return new InetSocketAddress(host, port);
-  }
-
-  private static URI functionUri(JsonNode functionNode) {
-    String uri = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_ENDPOINT);
-    URI typedUri = URI.create(uri);
-    @Nullable String scheme = typedUri.getScheme();
-    if (scheme == null) {
-      throw new IllegalArgumentException(
-          "Missing scheme in function endpoint "
-              + uri
-              + "; an http or https scheme must be provided.");
-    }
-    if (scheme.equalsIgnoreCase("http")
-        || scheme.equalsIgnoreCase("https")
-        || scheme.equalsIgnoreCase("http+unix")
-        || scheme.equalsIgnoreCase("https+unix")) {
-      return typedUri;
-    }
-    throw new IllegalArgumentException(
-        "Missing scheme in function endpoint "
-            + uri
-            + "; an http or https or http+unix or https+unix scheme must be provided.");
-  }
-
-  private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() {
-    return toMap(FunctionSpec::functionType, Function.identity());
-  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java
deleted file mode 100644
index 3616bcd..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.flink.core.jsonmodule;
-
-import java.net.URL;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.statefun.flink.common.json.Selectors;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-
-final class JsonModuleFactory {
-
-  private JsonModuleFactory() {}
-
-  static StatefulFunctionModule create(JsonNode root, URL moduleUrl) {
-    final FormatVersion formatVersion = formatVersion(root);
-    final JsonNode spec = root.at(Pointers.MODULE_SPEC);
-
-    switch (formatVersion) {
-      case v1_0:
-        return new JsonModule(spec, moduleUrl);
-      default:
-        throw new IllegalArgumentException("Unrecognized format version: " + formatVersion);
-    }
-  }
-
-  private static FormatVersion formatVersion(JsonNode root) {
-    String versionStr = Selectors.textAt(root, Pointers.FORMAT_VERSION);
-    return FormatVersion.fromString(versionStr);
-  }
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
index 0ac7712..84e3eba 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
@@ -26,6 +26,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import org.apache.flink.statefun.flink.common.ResourceLocator;
+import org.apache.flink.statefun.flink.common.json.Selectors;
 import org.apache.flink.statefun.flink.core.spi.Constants;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 
@@ -45,8 +46,9 @@ public final class JsonServiceLoader {
   @VisibleForTesting
   static StatefulFunctionModule fromUrl(ObjectMapper mapper, URL moduleUrl) {
     try {
-      JsonNode root = readAndValidateModuleTree(mapper, moduleUrl);
-      return JsonModuleFactory.create(root, moduleUrl);
+      final JsonNode root = readAndValidateModuleTree(mapper, moduleUrl);
+      return new JsonModule(
+          requireValidModuleSpecNode(moduleUrl, root), requireValidFormatVersion(root), moduleUrl);
     } catch (Throwable t) {
       throw new RuntimeException("Failed loading a module at " + moduleUrl, t);
     }
@@ -55,21 +57,13 @@ public final class JsonServiceLoader {
   /**
    * Read a {@code StatefulFunction} module definition.
    *
-   * <p>A valid resource module definition has to contain the following sections:
-   *
-   * <ul>
-   *   <li>meta - contains the metadata associated with this module, such as its type.
-   *   <li>spec - a specification of the module. i.e. the definied functions, routers etc'.
-   * </ul>
-   *
-   * <p>If any of these sections are missing, this would be considered an invalid module definition,
-   * in addition a type is a mandatory field of a module spec.
+   * <p>A valid resource module definition has to contain the metadata associated with this module,
+   * such as its type.
    */
   private static JsonNode readAndValidateModuleTree(ObjectMapper mapper, URL moduleYamlFile)
       throws IOException {
     JsonNode root = mapper.readTree(moduleYamlFile);
     validateMeta(moduleYamlFile, root);
-    validateSpec(moduleYamlFile, root);
     return root;
   }
 
@@ -87,10 +81,19 @@ public final class JsonServiceLoader {
     }
   }
 
-  private static void validateSpec(URL moduleYamlFile, JsonNode root) {
-    if (root.at(Pointers.MODULE_SPEC).isMissingNode()) {
+  private static JsonNode requireValidModuleSpecNode(URL moduleYamlFile, JsonNode root) {
+    final JsonNode moduleSpecNode = root.at(Pointers.MODULE_SPEC);
+
+    if (moduleSpecNode.isMissingNode()) {
       throw new IllegalStateException("A module without a spec at " + moduleYamlFile);
     }
+
+    return moduleSpecNode;
+  }
+
+  private static FormatVersion requireValidFormatVersion(JsonNode root) {
+    final String formatVersion = Selectors.textAt(root, Pointers.FORMAT_VERSION);
+    return FormatVersion.fromString(formatVersion);
   }
 
   @VisibleForTesting
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
index f1f3aa1..28ee1fb 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
@@ -22,10 +22,8 @@ import static org.junit.Assert.assertThat;
 
 import com.google.protobuf.Any;
 import com.google.protobuf.Message;
-import java.io.IOException;
 import java.net.URL;
 import java.util.Collections;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
@@ -98,13 +96,7 @@ public class JsonModuleTest {
     URL moduleUrl = JsonModuleTest.class.getClassLoader().getResource(path);
     assertThat(moduleUrl, not(nullValue()));
     ObjectMapper mapper = JsonServiceLoader.mapper();
-    final JsonNode json;
-    try {
-      json = mapper.readTree(moduleUrl);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return JsonModuleFactory.create(json, moduleUrl);
+    return JsonServiceLoader.fromUrl(mapper, moduleUrl);
   }
 
   private static StatefulFunctionsUniverse emptyUniverse() {


[flink-statefun] 10/10: [FLINK-17997] [legal] Revert manual merging of AWS KPL's THIRD_PARTY_NOTICE content

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 4b96b432a0ce0fa075870596d74248a8d95519c8
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 28 16:25:09 2020 +0800

    [FLINK-17997] [legal] Revert manual merging of AWS KPL's THIRD_PARTY_NOTICE content
    
    We can revert this now, since the upstream dependency (Flink Kinesis
    Connector) now handles the content properly.
    
    This closes #121.
---
 statefun-flink/statefun-flink-distribution/pom.xml |  16 --
 .../src/main/resources/META-INF/NOTICE             | 197 ---------------------
 2 files changed, 213 deletions(-)

diff --git a/statefun-flink/statefun-flink-distribution/pom.xml b/statefun-flink/statefun-flink-distribution/pom.xml
index 5b9dfc5..3e0f5ff 100644
--- a/statefun-flink/statefun-flink-distribution/pom.xml
+++ b/statefun-flink/statefun-flink-distribution/pom.xml
@@ -160,22 +160,6 @@ under the License.
                                         <exclude>NOTICE</exclude>
                                     </excludes>
                                 </filter>
-                                <filter>
-                                    <artifact>org.apache.flink:flink-connector-kinesis_${scala.binary.version}:*</artifact>
-                                    <excludes>
-                                        <!--
-                                            This file comes from AWS Kinesis Producer Library, and due to its name, the
-                                            file is not detected by the ApacheNoticeResourceTransformer, and therefore
-                                            the content not automatically merged to our own NOTICE file.
-
-                                            We manually merge the contents, and exclude it for packaging.
-
-                                            TODO this should be fixed in the upstream flink-connector-kinesis dependency.
-                                            TODO once that happens, we can remove this here.
-                                        -->
-                                        <exclude>META-INF/THIRD_PARTY_NOTICES</exclude>
-                                    </excludes>
-                                </filter>
                             </filters>
                             <transformers>
                                 <transformer
diff --git a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
index 1cd43cc..ddf1933 100644
--- a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
+++ b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
@@ -31,200 +31,3 @@ See bundled license files under "META-INF/licenses" for details.
 
 - com.github.luben:zstd-jni:1.3.8-1
 - com.google.protobuf:protobuf-java:3.7.1
-
-This project bundles com.amazonaws:amazon-kinesis-producer:0.13.1, licensed under the Apache Software License 2.0, which contains the following notice:
-
-======= Start of THIRD_PARTY_NOTICES in com.amazonaws:amazon-kinesis-producer:0.13.1 ======
-
-The Amazon Kinesis Producer Library includes http-parser, Copyright (c) Joyent, Inc. and other Node contributors,
-libc++, Copyright (c) 2003-2014, LLVM Project, and slf4j, Copyright (c) 2004-2013 QOS.ch, each of which is subject to
-the terms and conditions of the MIT license that states as follows:
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
-documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
-rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
-sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following
-conditions:
-
-The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
-WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
-COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-The Amazon Kinesis Producer Library includes Protocol Buffers, Copyright (c) 2014, Google Inc. (except for
-atomicops_internals_generic_gcc.h, which is Copyright (c) Red Hat Inc., atomicops_internals_aix.h, which is
-Copyright (c) Bloomberg Finance LP, and Andorid.mk, which is Copyright (c) The Android Open Source Project), base64,
-Copyright (c) 2013, Alfred Klomp, glog, Copyright (c) 2008, Google Inc., libcxxrt, Copyright (c) 2010-2011 PathScale,
-Inc., and LLVM + clang, Copyright (c) 2003-2014 University of Illinois at Urbana-Champaign, each of which is subject to
-the terms and conditions of the BSD license that states as follows:
-
-Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
-following conditions are met:
-
-* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
-
-* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
-disclaimer in the documentation and/or other materials provided with the distribution.
-
-* Neither the name of the author nor the names of its contributors may be used to endorse or promote products derived
-from this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
-INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
-WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-The Amazon Kinesis Producer Library includes Boost C++ Libraries, which is subject to the
-Boost Software License - Version 1.0 that states as follows:
-
-Permission is hereby granted, free of charge, to any person or organization obtaining a copy of the software and
-accompanying documentation covered by this license (the "Software") to use, reproduce, display, distribute, execute,
-and transmit the Software, and to prepare derivative works of the
-Software, and to permit third-parties to whom the Software is furnished to do so, all subject to the following:
-
-The copyright notices in the Software and this entire statement, including the above license grant, this restriction
-and the following disclaimer, must be included in all copies of the Software, in whole or in part, and all derivative
-works of the Software, unless such copies or derivative works are solely in the form of machine-executable object code
-generated by a source language processor.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
-WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
-COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT,
-TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-The Amazon Kinesis Producer Library includes OpenSSL, which is subject to the following terms and conditions:
-
-LICENSE ISSUES
-  ==============
-
-  The OpenSSL toolkit stays under a dual license, i.e. both the conditions of
-  the OpenSSL License and the original SSLeay license apply to the toolkit.
-  See below for the actual license texts. Actually both licenses are BSD-style
-  Open Source licenses. In case of any license issues related to OpenSSL
-  please contact openssl-core@openssl.org.
-
-  OpenSSL License
-  ---------------
-
-/* ====================================================================
- * Copyright (c) 1998-2011 The OpenSSL Project.  All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- *
- * 1. Redistributions of source code must retain the above copyright
- *    notice, this list of conditions and the following disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above copyright
- *    notice, this list of conditions and the following disclaimer in
- *    the documentation and/or other materials provided with the
- *    distribution.
- *
- * 3. All advertising materials mentioning features or use of this
- *    software must display the following acknowledgment:
- *    "This product includes software developed by the OpenSSL Project
- *    for use in the OpenSSL Toolkit. (http://www.openssl.org/)"
- *
- * 4. The names "OpenSSL Toolkit" and "OpenSSL Project" must not be used to
- *    endorse or promote products derived from this software without
- *    prior written permission. For written permission, please contact
- *    openssl-core@openssl.org.
- *
- * 5. Products derived from this software may not be called "OpenSSL"
- *    nor may "OpenSSL" appear in their names without prior written
- *    permission of the OpenSSL Project.
- *
- * 6. Redistributions of any form whatsoever must retain the following
- *    acknowledgment:
- *    "This product includes software developed by the OpenSSL Project
- *    for use in the OpenSSL Toolkit (http://www.openssl.org/)"
- *
- * THIS SOFTWARE IS PROVIDED BY THE OpenSSL PROJECT ``AS IS'' AND ANY
- * EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE OpenSSL PROJECT OR
- * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
- * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
- * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
- * OF THE POSSIBILITY OF SUCH DAMAGE.
- * ====================================================================
- *
- * This product includes cryptographic software written by Eric Young
- * (eay@cryptsoft.com).  This product includes software written by Tim
- * Hudson (tjh@cryptsoft.com).
- *
- */
-
- Original SSLeay License
- -----------------------
-
-/* Copyright (C) 1995-1998 Eric Young (eay@cryptsoft.com)
- * All rights reserved.
- *
- * This package is an SSL implementation written
- * by Eric Young (eay@cryptsoft.com).
- * The implementation was written so as to conform with Netscapes SSL.
- *
- * This library is free for commercial and non-commercial use as long as
- * the following conditions are aheared to.  The following conditions
- * apply to all code found in this distribution, be it the RC4, RSA,
- * lhash, DES, etc., code; not just the SSL code.  The SSL documentation
- * included with this distribution is covered by the same copyright terms
- * except that the holder is Tim Hudson (tjh@cryptsoft.com).
- *
- * Copyright remains Eric Young's, and as such any Copyright notices in
- * the code are not to be removed.
- * If this package is used in a product, Eric Young should be given attribution
- * as the author of the parts of the library used.
- * This can be in the form of a textual message at program startup or
- * in documentation (online or textual) provided with the package.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the copyright
- *    notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- *    notice, this list of conditions and the following disclaimer in the
- *    documentation and/or other materials provided with the distribution.
- * 3. All advertising materials mentioning features or use of this software
- *    must display the following acknowledgement:
- *    "This product includes cryptographic software written by
- *     Eric Young (eay@cryptsoft.com)"
- *    The word 'cryptographic' can be left out if the rouines from the library
- *    being used are not cryptographic related :-).
- * 4. If you include any Windows specific code (or a derivative thereof) from
- *    the apps directory (application code) you must include an acknowledgement:
- *    "This product includes software written by Tim Hudson (tjh@cryptsoft.com)"
- *
- * THIS SOFTWARE IS PROVIDED BY ERIC YOUNG ``AS IS'' AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
- * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- * SUCH DAMAGE.
- *
- * The licence and distribution terms for any publically available version or
- * derivative of this code cannot be changed.  i.e. this code cannot simply be
- * copied and put under another distribution licence
- * [including the GNU Public Licence.]
- */
-
-The Amazon Kinesis Producer Library includes Guava, which is subject to the terms and conditions of the Apache License Version 2.0.
-
-======= End of THIRD_PARTY_NOTICES in com.amazonaws:amazon-kinesis-producer:0.13.1 ======


[flink-statefun] 07/10: [FLINK-17875] [core] Add format version 2.0

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 3c8e1f46aa14ec6f4da3ddabd2234f5260f5c09a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 28 14:59:58 2020 +0800

    [FLINK-17875] [core] Add format version 2.0
---
 .../apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
index 8f01a33..debb28c 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
@@ -19,7 +19,8 @@
 package org.apache.flink.statefun.flink.core.jsonmodule;
 
 enum FormatVersion {
-  v1_0("1.0");
+  v1_0("1.0"),
+  v2_0("2.0");
 
   private String versionStr;
 
@@ -36,6 +37,8 @@ enum FormatVersion {
     switch (versionStr) {
       case "1.0":
         return v1_0;
+      case "2.0":
+        return v2_0;
       default:
         throw new IllegalArgumentException("Unrecognized format version: " + versionStr);
     }