You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/06 01:38:52 UTC

[flink-statefun] branch master updated (9c988a0 -> adb1b2d)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 9c988a0  [FLINK-16396] [kafka] Allow null keys for the YAML generic Kafka egress
     new 8a6b8e1  [FLINK-16415] [core] Introduce JsonModuleFactory
     new edfcbcb  [FLINK-16415] [core] Use JsonModuleFactory in JsonServiceLoader
     new adb1b2d  [FLINK-16415] [tests] Adapt tests to cover the new version field in YAML modules

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../routable-kafka-ingress-module/module.yaml      |  2 ++
 .../{FunctionSpec.java => FormatVersion.java}      | 25 ++++++++++++-----
 .../flink/core/jsonmodule/JsonModuleFactory.java}  | 32 +++++++++++++---------
 .../flink/core/jsonmodule/JsonServiceLoader.java   |  3 +-
 .../statefun/flink/core/jsonmodule/Pointers.java   |  2 ++
 .../flink/core/jsonmodule/JsonModuleTest.java      |  3 +-
 .../src/test/resources/bar-module/module.yaml      |  2 ++
 7 files changed, 45 insertions(+), 24 deletions(-)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/{FunctionSpec.java => FormatVersion.java} (66%)
 copy statefun-flink/{statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/testutils/YamlUtils.java => statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java} (52%)


[flink-statefun] 02/03: [FLINK-16415] [core] Use JsonModuleFactory in JsonServiceLoader

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit edfcbcb7e908bc7d08d548f24dbec213ca6d4c8a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Mar 4 19:57:47 2020 +0800

    [FLINK-16415] [core] Use JsonModuleFactory in JsonServiceLoader
---
 .../apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
index e170260..0ac7712 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
@@ -46,8 +46,7 @@ public final class JsonServiceLoader {
   static StatefulFunctionModule fromUrl(ObjectMapper mapper, URL moduleUrl) {
     try {
       JsonNode root = readAndValidateModuleTree(mapper, moduleUrl);
-      JsonNode spec = root.at(Pointers.MODULE_SPEC);
-      return new JsonModule(spec, moduleUrl);
+      return JsonModuleFactory.create(root, moduleUrl);
     } catch (Throwable t) {
       throw new RuntimeException("Failed loading a module at " + moduleUrl, t);
     }


[flink-statefun] 03/03: [FLINK-16415] [tests] Adapt tests to cover the new version field in YAML modules

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit adb1b2d871292bd87b5652d0f0b04aa85ee39394
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Mar 4 19:58:43 2020 +0800

    [FLINK-16415] [tests] Adapt tests to cover the new version field in YAML modules
    
    This closes #50.
---
 .../src/test/resources/routable-kafka-ingress-module/module.yaml       | 2 ++
 .../apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java    | 3 +--
 .../statefun-flink-core/src/test/resources/bar-module/module.yaml      | 2 ++
 3 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml
index d9f1b5e..98fee69 100644
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml
+++ b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+version: "1.0"
+
 module:
   meta:
     type: remote
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
index b4d307c..a261ea4 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
@@ -102,8 +102,7 @@ public class JsonModuleTest {
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    JsonNode spec = json.at("/module/spec");
-    return new JsonModule(spec, moduleUrl);
+    return JsonModuleFactory.create(json, moduleUrl);
   }
 
   private static StatefulFunctionsUniverse emptyUniverse() {
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
index e7afab5..61aa52f 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+version: "1.0"
+
 module:
   meta:
     type: remote


[flink-statefun] 01/03: [FLINK-16415] [core] Introduce JsonModuleFactory

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 8a6b8e1cee25d2950b1090967423d9460321be25
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Mar 4 19:52:59 2020 +0800

    [FLINK-16415] [core] Introduce JsonModuleFactory
    
    The JsonModuleFactory is a static factory that creates JsonModules based
    on the specified format version.
    
    We currently only have one version, 1.0, and one corresponding
    JsonModule, i.e. the JsonModule class. We need to further refactor
    Pointers and JsonModule in the future once we introduce more versions.
---
 .../flink/core/jsonmodule/FormatVersion.java       | 43 ++++++++++++++++++++
 .../flink/core/jsonmodule/JsonModuleFactory.java   | 46 ++++++++++++++++++++++
 .../statefun/flink/core/jsonmodule/Pointers.java   |  2 +
 3 files changed, 91 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
new file mode 100644
index 0000000..8f01a33
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+enum FormatVersion {
+  v1_0("1.0");
+
+  private String versionStr;
+
+  FormatVersion(String versionStr) {
+    this.versionStr = versionStr;
+  }
+
+  @Override
+  public String toString() {
+    return versionStr;
+  }
+
+  static FormatVersion fromString(String versionStr) {
+    switch (versionStr) {
+      case "1.0":
+        return v1_0;
+      default:
+        throw new IllegalArgumentException("Unrecognized format version: " + versionStr);
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java
new file mode 100644
index 0000000..3616bcd
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import java.net.URL;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+final class JsonModuleFactory {
+
+  private JsonModuleFactory() {}
+
+  static StatefulFunctionModule create(JsonNode root, URL moduleUrl) {
+    final FormatVersion formatVersion = formatVersion(root);
+    final JsonNode spec = root.at(Pointers.MODULE_SPEC);
+
+    switch (formatVersion) {
+      case v1_0:
+        return new JsonModule(spec, moduleUrl);
+      default:
+        throw new IllegalArgumentException("Unrecognized format version: " + formatVersion);
+    }
+  }
+
+  private static FormatVersion formatVersion(JsonNode root) {
+    String versionStr = Selectors.textAt(root, Pointers.FORMAT_VERSION);
+    return FormatVersion.fromString(versionStr);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
index cc7caec..5c5b973 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
@@ -23,6 +23,8 @@ public final class Pointers {
 
   private Pointers() {}
 
+  public static final JsonPointer FORMAT_VERSION = JsonPointer.compile("/version");
+
   // -------------------------------------------------------------------------------------
   // top level spec definition
   // -------------------------------------------------------------------------------------