You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/21 07:12:27 UTC

[flink-statefun] branch release-2.0 updated (12576da -> 3d5522e)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 12576da  [hotfix] Avoid raw types in UnboundedFeedbackLoggerTest
     new f9d5c7a  [FLINK-17518] [python] Make Python SDK build script oblivious of working directory
     new 2220a4a  [FLINK-17518] [build] Add a root.dir property to Maven
     new 25e052f  [FLINK-17518] [e2e] Build Python SDK with -Prun-e2e-tests
     new ee596a0  [FLINK-17518] [e2e] Add resultsInAnyOrder to KafkaIOVerifier
     new 74aabbd  [FLINK-17518] [e2e] Add remote functions for remote module E2E
     new 82b238d  [FLINK-17518] [e2e] Add remote module E2E
     new cc8cbe4  [FLINK-17518] [e2e] Remove routable Kafka E2E test
     new 3d5522e  [hotfix] [build] Fix path to spotbugs exclude filter file

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |  21 ++-
 statefun-e2e-tests/pom.xml                         |  20 ++-
 .../statefun/e2e/common/kafka/KafkaIOVerifier.java |  53 +++++++
 .../pom.xml                                        |  25 +--
 .../main/protobuf/remote-module-verification.proto |  27 ++--
 .../src/main/python/functions.py                   |  97 ++++++++++++
 .../main/python/remote_module_verification_pb2.py  | 149 ++++++++++++++++++
 .../flink/statefun/e2e/remote/RemoteModuleE2E.java | 167 +++++++++++++++++++++
 .../src/test/resources/Dockerfile                  |   4 +-
 .../src/test/resources/Dockerfile.remote-function  |  10 +-
 .../src/test/resources/log4j.properties            |   0
 .../src/test/resources/remote-module}/module.yaml  |  39 +++--
 .../src/test/resources}/requirements.txt           |   1 -
 .../statefun/e2e/routablekafka/Constants.java      |  38 -----
 .../e2e/routablekafka/FnSelfAddressTagger.java     |  56 -------
 .../flink/statefun/e2e/routablekafka/KafkaIO.java  |  58 -------
 .../RoutableKafkaVerificationModule.java           |  56 -------
 .../protobuf/routable-kafka-verification.proto     |  41 -----
 .../e2e/routablekafka/RoutableKafkaE2E.java        | 143 ------------------
 .../src/test/resources/Dockerfile                  |  21 ---
 statefun-python-sdk/build-distribution.sh          |  13 +-
 21 files changed, 573 insertions(+), 466 deletions(-)
 rename statefun-e2e-tests/{statefun-routable-kafka-e2e => statefun-remote-module-e2e}/pom.xml (85%)
 copy statefun-examples/statefun-python-walkthrough-example/walkthrough.proto => statefun-e2e-tests/statefun-remote-module-e2e/src/main/protobuf/remote-module-verification.proto (74%)
 create mode 100644 statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/functions.py
 create mode 100644 statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/remote_module_verification_pb2.py
 create mode 100644 statefun-e2e-tests/statefun-remote-module-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/RemoteModuleE2E.java
 copy statefun-e2e-tests/{statefun-exactly-once-e2e => statefun-remote-module-e2e}/src/test/resources/Dockerfile (85%)
 copy statefun-examples/statefun-python-k8s-example/Dockerfile.python-worker => statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile.remote-function (77%)
 rename statefun-e2e-tests/{statefun-routable-kafka-e2e => statefun-remote-module-e2e}/src/test/resources/log4j.properties (100%)
 rename statefun-e2e-tests/{statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module => statefun-remote-module-e2e/src/test/resources/remote-module}/module.yaml (54%)
 copy {statefun-examples/statefun-python-greeter-example/greeter => statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources}/requirements.txt (97%)
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile


[flink-statefun] 08/08: [hotfix] [build] Fix path to spotbugs exclude filter file

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 3d5522ecb0b6c56edb384e9685e02df24a894b72
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 21 15:00:14 2020 +0800

    [hotfix] [build] Fix path to spotbugs exclude filter file
    
    This commit fixes the path so that it is absolute, instead of a relative
    path to the root directory. This was causing issues, since developers
    essentially could not build modules individually due to an error
    complaining that the exclude file cannot be located (since the path was
    relative to the root, and only works when building from root).
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index d75d850..9ec80b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -218,7 +218,7 @@ under the License.
                 <configuration>
                     <effort>Max</effort>
                     <threshold>Low</threshold>
-                    <excludeFilterFile>tools/maven/spotbugs-exclude.xml</excludeFilterFile>
+                    <excludeFilterFile>${root.dir}/tools/maven/spotbugs-exclude.xml</excludeFilterFile>
                 </configuration>
             </plugin>
 


[flink-statefun] 06/08: [FLINK-17518] [e2e] Add remote module E2E

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 82b238d676dcef4a20747f08932cda93528768ca
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:30:38 2020 +0800

    [FLINK-17518] [e2e] Add remote module E2E
---
 .../flink/statefun/e2e/remote/RemoteModuleE2E.java | 167 +++++++++++++++++++++
 .../src/test/resources/Dockerfile                  |  20 +++
 .../src/test/resources/Dockerfile.remote-function  |  34 +++++
 .../src/test/resources/log4j.properties            |  24 +++
 .../src/test/resources/remote-module/module.yaml   |  60 ++++++++
 .../src/test/resources/requirements.txt            |  21 +++
 6 files changed, 326 insertions(+)

diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/RemoteModuleE2E.java b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/RemoteModuleE2E.java
new file mode 100644
index 0000000..618150b
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/RemoteModuleE2E.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.remote;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
+import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
+import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
+import org.apache.flink.statefun.e2e.remote.generated.RemoteModuleVerification.Invoke;
+import org.apache.flink.statefun.e2e.remote.generated.RemoteModuleVerification.InvokeResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * End-to-end test for a completely YAML-based remote module setup.consisting of
+ *
+ * <p>The setup consists of a auto-routable YAML Kafka ingress, the generic YAML Kafka egress, and
+ * two Python. remote functions: 1) a simple invocation counter function, which gets routed invoke
+ * messages from the auto-routable Kafka ingress, and 2) a simple stateless forwarding. function,
+ * which gets the invocation counts from the counter function and simply forwards them to the Kafka
+ * egress.
+ *
+ * <p>We perform the extra stateless forwarding so that the E2E test scenario covers messaging
+ * between remote functions.
+ */
+public class RemoteModuleE2E {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteModuleE2E.class);
+
+  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+
+  private static final String KAFKA_HOST = "kafka-broker";
+  private static final String INVOKE_TOPIC = "invoke";
+  private static final String INVOKE_RESULTS_TOPIC = "invoke-results";
+
+  private static final String REMOTE_FUNCTION_HOST = "remote-function";
+
+  @Rule
+  public KafkaContainer kafka =
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
+
+  @Rule
+  public GenericContainer<?> remoteFunction =
+      new GenericContainer<>(remoteFunctionImage())
+          .withNetworkAliases(REMOTE_FUNCTION_HOST)
+          .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+  @Rule
+  public StatefulFunctionsAppContainers verificationApp =
+      StatefulFunctionsAppContainers.builder("remote-module-verification", 2)
+          .dependsOn(kafka)
+          .dependsOn(remoteFunction)
+          .exposeMasterLogs(LOG)
+          .withBuildContextFileFromClasspath("remote-module", "/remote-module/")
+          .build();
+
+  @Test
+  public void run() {
+    final String kafkaAddress = kafka.getBootstrapServers();
+
+    final Producer<String, Invoke> invokeProducer = kafkaKeyedInvokesProducer(kafkaAddress);
+    final Consumer<String, InvokeResult> invokeResultConsumer =
+        kafkaInvokeResultsConsumer(kafkaAddress);
+
+    final KafkaIOVerifier<String, Invoke, String, InvokeResult> verifier =
+        new KafkaIOVerifier<>(invokeProducer, invokeResultConsumer);
+
+    // we verify results come in any order, since the results from the counter function are
+    // being forwarded to the forwarding function with a random key, and therefore
+    // might be written to Kafka out-of-order. We specifically use random keys there
+    // so that the E2E may cover both local handovers and cross-partition messaging via the
+    // feedback loop in the remote module setup.
+    assertThat(
+        verifier.sending(invoke("foo"), invoke("foo"), invoke("bar")),
+        verifier.resultsInAnyOrder(
+            is(invokeResult("foo", 1)), is(invokeResult("foo", 2)), is(invokeResult("bar", 1))));
+  }
+
+  private static ImageFromDockerfile remoteFunctionImage() {
+    final Path pythonSourcePath = remoteFunctionPythonSourcePath();
+    LOG.info("Building remote function image with Python source at: {}", pythonSourcePath);
+
+    final Path pythonSdkPath = pythonSdkPath();
+    LOG.info("Located built Python SDK at: {}", pythonSdkPath);
+
+    return new ImageFromDockerfile("remote-function")
+        .withFileFromClasspath("Dockerfile", "Dockerfile.remote-function")
+        .withFileFromPath("source/", pythonSourcePath)
+        .withFileFromClasspath("requirements.txt", "requirements.txt")
+        .withFileFromPath("python-sdk/", pythonSdkPath);
+  }
+
+  private static Path remoteFunctionPythonSourcePath() {
+    return Paths.get(System.getProperty("user.dir") + "/src/main/python");
+  }
+
+  private static Path pythonSdkPath() {
+    return Paths.get(System.getProperty("user.dir") + "/../../statefun-python-sdk/dist");
+  }
+
+  private static Producer<String, Invoke> kafkaKeyedInvokesProducer(String bootstrapServers) {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", bootstrapServers);
+
+    return new KafkaProducer<>(
+        props, new StringSerializer(), new KafkaProtobufSerializer<>(Invoke.parser()));
+  }
+
+  private Consumer<String, InvokeResult> kafkaInvokeResultsConsumer(String bootstrapServers) {
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
+    consumerProps.setProperty("group.id", "remote-module-e2e");
+    consumerProps.setProperty("auto.offset.reset", "earliest");
+
+    KafkaConsumer<String, InvokeResult> consumer =
+        new KafkaConsumer<>(
+            consumerProps,
+            new StringDeserializer(),
+            new KafkaProtobufSerializer<>(InvokeResult.parser()));
+    consumer.subscribe(Collections.singletonList(INVOKE_RESULTS_TOPIC));
+
+    return consumer;
+  }
+
+  private static ProducerRecord<String, Invoke> invoke(String target) {
+    return new ProducerRecord<>(INVOKE_TOPIC, target, Invoke.getDefaultInstance());
+  }
+
+  private static InvokeResult invokeResult(String id, int invokeCount) {
+    return InvokeResult.newBuilder().setId(id).setInvokeCount(invokeCount).build();
+  }
+}
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile
new file mode 100644
index 0000000..e3f8b9d
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM flink-statefun:2.1-SNAPSHOT
+
+RUN mkdir -p /opt/statefun/modules/statefun-remote-module-e2e
+COPY remote-module/ /opt/statefun/modules/statefun-remote-module-e2e/
+COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile.remote-function b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile.remote-function
new file mode 100644
index 0000000..7eb3ffe
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile.remote-function
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM python:3.7-alpine
+
+RUN mkdir -p /app
+WORKDIR /app
+
+COPY python-sdk/apache_flink_statefun-*-py3-none-any.whl /app
+RUN pip install apache_flink_statefun-*-py3-none-any.whl
+
+COPY requirements.txt /app
+RUN pip install -r requirements.txt
+
+COPY source/functions.py /app
+COPY source/remote_module_verification_pb2.py /app
+
+EXPOSE 8000
+
+CMD ["gunicorn", "-b", "0.0.0.0:8000", "-w 1", "functions:app"]
+
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fb965d3
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/remote-module/module.yaml
new file mode 100644
index 0000000..fc1e57c
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/remote-module/module.yaml
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "1.0"
+
+module:
+  meta:
+    type: remote
+  spec:
+    functions:
+      - function:
+          meta:
+            kind: http
+            type: org.apache.flink.statefun.e2e.remote/counter
+          spec:
+            endpoint: http://remote-function:8000/service
+            states:
+              - invoke_count
+            maxNumBatchRequests: 10000
+      - function:
+          meta:
+            kind: http
+            type: org.apache.flink.statefun.e2e.remote/forward-function
+          spec:
+            endpoint: http://remote-function:8000/service
+            maxNumBatchRequests: 10000
+    ingresses:
+      - ingress:
+          meta:
+            type: statefun.kafka.io/routable-protobuf-ingress
+            id: org.apache.flink.statefun.e2e.remote/invoke
+          spec:
+            address: kafka-broker:9092
+            consumerGroupId: remote-module-e2e
+            startupPosition:
+              type: earliest
+            topics:
+              - topic: invoke
+                typeUrl: com.googleapis/org.apache.flink.statefun.e2e.remote.Invoke
+                targets:
+                  - org.apache.flink.statefun.e2e.remote/counter
+    egresses:
+      - egress:
+          meta:
+            type: statefun.kafka.io/generic-egress
+            id: org.apache.flink.statefun.e2e.remote/invoke-results
+          spec:
+            address: kafka-broker:9092
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/requirements.txt b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/requirements.txt
new file mode 100644
index 0000000..2b89e19
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/requirements.txt
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+flask==1.1.1
+gunicorn==20.0.4
+
+
+


[flink-statefun] 02/08: [FLINK-17518] [build] Add a root.dir property to Maven

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 2220a4a4f16810c15daf1947c96b1b343fec7fcf
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 21 12:58:20 2020 +0800

    [FLINK-17518] [build] Add a root.dir property to Maven
    
    Maven, by default, doesn't have a property that points to the root
    parent module's directory. We work around that by using the
    directory-maven-plugin to define that.
---
 pom.xml | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/pom.xml b/pom.xml
index 40e164a..d75d850 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,7 @@ under the License.
         <protobuf.version>3.7.1</protobuf.version>
         <protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version>
         <flink.version>1.10.1</flink.version>
+        <root.dir>${rootDir}</root.dir>
     </properties>
 
     <dependencies>
@@ -312,6 +313,24 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
+
+            <plugin>
+                <groupId>org.commonjava.maven.plugins</groupId>
+                <artifactId>directory-maven-plugin</artifactId>
+                <version>0.1</version>
+                <executions>
+                    <execution>
+                        <id>directories</id>
+                        <goals>
+                            <goal>highest-basedir</goal>
+                        </goals>
+                        <phase>initialize</phase>
+                        <configuration>
+                            <property>rootDir</property>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>


[flink-statefun] 04/08: [FLINK-17518] [e2e] Add resultsInAnyOrder to KafkaIOVerifier

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit ee596a0f6ad110994c80c8e55e77b328d90fbc93
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:26:24 2020 +0800

    [FLINK-17518] [e2e] Add resultsInAnyOrder to KafkaIOVerifier
    
    This commit adds an additional kind of matcher that matches Kafka
    outputs in any order that they are consumed.
---
 .../statefun/e2e/common/kafka/KafkaIOVerifier.java | 53 ++++++++++++++++++++++
 1 file changed, 53 insertions(+)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/kafka/KafkaIOVerifier.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/kafka/KafkaIOVerifier.java
index cea79b5..c70dfc6 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/kafka/KafkaIOVerifier.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/kafka/KafkaIOVerifier.java
@@ -19,6 +19,10 @@
 package org.apache.flink.statefun.e2e.common.kafka;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -147,6 +151,44 @@ public final class KafkaIOVerifier<PK, PV, CK, CV> {
     };
   }
 
+  /**
+   * Matcher for verifying the outputs, happening in any order, as a result of calling {@link
+   * #sending(ProducerRecord[])}.
+   *
+   * @param expectedResults matchers for the expected results.
+   * @return a matcher for verifying the output of calling {@link #sending(ProducerRecord[])}.
+   */
+  @SafeVarargs
+  public final Matcher<OutputsHandoff<CV>> resultsInAnyOrder(Matcher<CV>... expectedResults) {
+    return new TypeSafeMatcher<OutputsHandoff<CV>>() {
+      @Override
+      protected boolean matchesSafely(OutputsHandoff<CV> outputHandoff) {
+        final List<Matcher<CV>> expectedResultsList =
+            new ArrayList<>(Arrays.asList(expectedResults));
+
+        try {
+          while (!expectedResultsList.isEmpty()) {
+            CV output = outputHandoff.take();
+            if (!checkAndRemoveIfMatch(expectedResultsList, output)) {
+              return false;
+            }
+          }
+
+          // any dangling unexpected output should count as a mismatch
+          // TODO should we poll with timeout for a stronger verification?
+          return outputHandoff.peek() == null;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        } finally {
+          outputHandoff.verified();
+        }
+      }
+
+      @Override
+      public void describeTo(Description description) {}
+    };
+  }
+
   private static final class OutputsHandoff<T> extends LinkedBlockingQueue<T> {
 
     private static final long serialVersionUID = 1L;
@@ -161,4 +203,15 @@ public final class KafkaIOVerifier<PK, PV, CK, CV> {
       this.isVerified = true;
     }
   }
+
+  private static <CV> boolean checkAndRemoveIfMatch(List<Matcher<CV>> expectedResultsList, CV in) {
+    final Iterator<Matcher<CV>> matchersIterator = expectedResultsList.iterator();
+    while (matchersIterator.hasNext()) {
+      if (matchersIterator.next().matches(in)) {
+        matchersIterator.remove();
+        return true;
+      }
+    }
+    return false;
+  }
 }


[flink-statefun] 05/08: [FLINK-17518] [e2e] Add remote functions for remote module E2E

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 74aabbdf9682dc8978abb30697e422f345491edc
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:30:02 2020 +0800

    [FLINK-17518] [e2e] Add remote functions for remote module E2E
---
 statefun-e2e-tests/pom.xml                         |   1 +
 .../statefun-remote-module-e2e/pom.xml             | 105 +++++++++++++++
 .../main/protobuf/remote-module-verification.proto |  35 +++++
 .../src/main/python/functions.py                   |  97 ++++++++++++++
 .../main/python/remote_module_verification_pb2.py  | 149 +++++++++++++++++++++
 5 files changed, 387 insertions(+)

diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index b49c77c..c6bccd9 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -32,6 +32,7 @@ under the License.
         <module>statefun-e2e-tests-common</module>
         <module>statefun-sanity-e2e</module>
         <module>statefun-routable-kafka-e2e</module>
+        <module>statefun-remote-module-e2e</module>
         <module>statefun-exactly-once-e2e</module>
     </modules>
 
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/pom.xml b/statefun-e2e-tests/statefun-remote-module-e2e/pom.xml
new file mode 100644
index 0000000..e904370
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>statefun-e2e-tests</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>2.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>statefun-remote-module-e2e</artifactId>
+
+    <properties>
+        <testcontainers.version>1.12.5</testcontainers.version>
+        <kafka.version>2.2.0</kafka.version>
+    </properties>
+
+    <dependencies>
+        <!-- Kafka clients -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Protobuf -->
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+
+        <!-- logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.15</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- End-to-end test common -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-e2e-tests-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Testcontainers KafkaContainer -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.github.os72</groupId>
+                <artifactId>protoc-jar-maven-plugin</artifactId>
+                <version>${protoc-jar-maven-plugin.version}</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <configuration>
+                    <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/main/protobuf/remote-module-verification.proto b/statefun-e2e-tests/statefun-remote-module-e2e/src/main/protobuf/remote-module-verification.proto
new file mode 100644
index 0000000..1cafcf1
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/main/protobuf/remote-module-verification.proto
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.e2e.remote;
+option java_package = "org.apache.flink.statefun.e2e.remote.generated";
+option java_multiple_files = false;
+
+message Invoke {
+}
+
+message InvokeCount {
+    int32 count = 1;
+}
+
+message InvokeResult {
+    string id = 1;
+    int32 invoke_count = 2;
+}
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/functions.py b/statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/functions.py
new file mode 100644
index 0000000..be2abe4
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/functions.py
@@ -0,0 +1,97 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from remote_module_verification_pb2 import Invoke, InvokeResult, InvokeCount
+
+from statefun import StatefulFunctions
+from statefun import RequestReplyHandler
+from statefun import kafka_egress_record
+
+import uuid
+
+functions = StatefulFunctions()
+
+
+@functions.bind("org.apache.flink.statefun.e2e.remote/counter")
+def counter(context, invoke: Invoke):
+    """
+    Keeps count of the number of invocations, and forwards that count
+    to be sent to the Kafka egress. We do the extra forwarding instead
+    of directly sending to Kafka, so that we cover inter-function
+    messaging in our E2E test.
+    """
+    invoke_count = context.state('invoke_count').unpack(InvokeCount)
+    if not invoke_count:
+        invoke_count = InvokeCount()
+        invoke_count.count = 1
+    else:
+        invoke_count.count += 1
+    context.state('invoke_count').pack(invoke_count)
+
+    response = InvokeResult()
+    response.id = context.address.identity
+    response.invoke_count = invoke_count.count
+
+    context.pack_and_send(
+        "org.apache.flink.statefun.e2e.remote/forward-function",
+        # use random keys to simulate both local handovers and
+        # cross-partition messaging via the feedback loop
+        uuid.uuid4().hex,
+        response
+    )
+
+
+@functions.bind("org.apache.flink.statefun.e2e.remote/forward-function")
+def forward_to_egress(context, invoke_result: InvokeResult):
+    """
+    Simply forwards the results to the Kafka egress.
+    """
+    egress_message = kafka_egress_record(
+        topic="invoke-results",
+        key=invoke_result.id,
+        value=invoke_result
+    )
+    context.pack_and_send_egress(
+        "org.apache.flink.statefun.e2e.remote/invoke-results",
+        egress_message
+    )
+
+
+handler = RequestReplyHandler(functions)
+
+#
+# Serve the endpoint
+#
+
+from flask import request
+from flask import make_response
+from flask import Flask
+
+app = Flask(__name__)
+
+
+@app.route('/service', methods=['POST'])
+def handle():
+    response_data = handler(request.data)
+    response = make_response(response_data)
+    response.headers.set('Content-Type', 'application/octet-stream')
+    return response
+
+
+if __name__ == "__main__":
+    app.run()
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/remote_module_verification_pb2.py b/statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/remote_module_verification_pb2.py
new file mode 100644
index 0000000..8e0a556
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/remote_module_verification_pb2.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: remote-module-verification.proto
+
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+  name='remote-module-verification.proto',
+  package='org.apache.flink.statefun.e2e.remote',
+  syntax='proto3',
+  serialized_options=_b('\n.org.apache.flink.statefun.e2e.remote.generatedP\000'),
+  serialized_pb=_b('\n remote-module-verification.proto\x12$org.apache.flink.statefun.e2e.remote\"\x08\n\x06Invoke\"\x1c\n\x0bInvokeCount\x12\r\n\x05\x63ount\x18\x01 \x01(\x05\"0\n\x0cInvokeResult\x12\n\n\x02id\x18\x01 \x01(\t\x12\x14\n\x0cinvoke_count\x18\x02 \x01(\x05\x42\x32\n.org.apache.flink.statefun.e2e.remote.generatedP\x00\x62\x06proto3')
+)
+
+
+
+
+_INVOKE = _descriptor.Descriptor(
+  name='Invoke',
+  full_name='org.apache.flink.statefun.e2e.remote.Invoke',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=74,
+  serialized_end=82,
+)
+
+
+_INVOKECOUNT = _descriptor.Descriptor(
+  name='InvokeCount',
+  full_name='org.apache.flink.statefun.e2e.remote.InvokeCount',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='count', full_name='org.apache.flink.statefun.e2e.remote.InvokeCount.count', index=0,
+      number=1, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=84,
+  serialized_end=112,
+)
+
+
+_INVOKERESULT = _descriptor.Descriptor(
+  name='InvokeResult',
+  full_name='org.apache.flink.statefun.e2e.remote.InvokeResult',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='id', full_name='org.apache.flink.statefun.e2e.remote.InvokeResult.id', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='invoke_count', full_name='org.apache.flink.statefun.e2e.remote.InvokeResult.invoke_count', index=1,
+      number=2, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=114,
+  serialized_end=162,
+)
+
+DESCRIPTOR.message_types_by_name['Invoke'] = _INVOKE
+DESCRIPTOR.message_types_by_name['InvokeCount'] = _INVOKECOUNT
+DESCRIPTOR.message_types_by_name['InvokeResult'] = _INVOKERESULT
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+Invoke = _reflection.GeneratedProtocolMessageType('Invoke', (_message.Message,), dict(
+  DESCRIPTOR = _INVOKE,
+  __module__ = 'remote_module_verification_pb2'
+  # @@protoc_insertion_point(class_scope:org.apache.flink.statefun.e2e.remote.Invoke)
+  ))
+_sym_db.RegisterMessage(Invoke)
+
+InvokeCount = _reflection.GeneratedProtocolMessageType('InvokeCount', (_message.Message,), dict(
+  DESCRIPTOR = _INVOKECOUNT,
+  __module__ = 'remote_module_verification_pb2'
+  # @@protoc_insertion_point(class_scope:org.apache.flink.statefun.e2e.remote.InvokeCount)
+  ))
+_sym_db.RegisterMessage(InvokeCount)
+
+InvokeResult = _reflection.GeneratedProtocolMessageType('InvokeResult', (_message.Message,), dict(
+  DESCRIPTOR = _INVOKERESULT,
+  __module__ = 'remote_module_verification_pb2'
+  # @@protoc_insertion_point(class_scope:org.apache.flink.statefun.e2e.remote.InvokeResult)
+  ))
+_sym_db.RegisterMessage(InvokeResult)
+
+
+DESCRIPTOR._options = None
+# @@protoc_insertion_point(module_scope)


[flink-statefun] 03/08: [FLINK-17518] [e2e] Build Python SDK with -Prun-e2e-tests

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 25e052f07668fc85af88e2f0ba50b7d145db6a79
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:25:33 2020 +0800

    [FLINK-17518] [e2e] Build Python SDK with -Prun-e2e-tests
---
 statefun-e2e-tests/pom.xml | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 83cba8d..b49c77c 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -63,16 +63,28 @@ under the License.
                         <version>1.6.0</version>
                         <executions>
                             <execution>
+                                <id>build-statefun-base-image</id>
+                                <goals>
+                                    <goal>exec</goal>
+                                </goals>
+                                <inherited>false</inherited>
                                 <phase>pre-integration-test</phase>
+                                <configuration>
+                                    <executable>${root.dir}/tools/docker/build-stateful-functions.sh</executable>
+                                </configuration>
+                            </execution>
+                            <execution>
+                                <id>build-python-sdk-packages</id>
                                 <goals>
                                     <goal>exec</goal>
                                 </goals>
                                 <inherited>false</inherited>
+                                <phase>pre-integration-test</phase>
+                                <configuration>
+                                    <executable>${root.dir}/statefun-python-sdk/build-distribution.sh</executable>
+                                </configuration>
                             </execution>
                         </executions>
-                        <configuration>
-                            <executable>${user.dir}/tools/docker/build-stateful-functions.sh</executable>
-                        </configuration>
                     </plugin>
                     <plugin>
                         <artifactId>maven-failsafe-plugin</artifactId>


[flink-statefun] 07/08: [FLINK-17518] [e2e] Remove routable Kafka E2E test

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit cc8cbe4beec3cba981fbf91b60af494405839158
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:32:26 2020 +0800

    [FLINK-17518] [e2e] Remove routable Kafka E2E test
    
    This E2E can be removed since it is completely subsumed by the new
    remote module E2E.
    
    This closes #115.
---
 statefun-e2e-tests/pom.xml                         |   1 -
 .../statefun-routable-kafka-e2e/pom.xml            | 102 ---------------
 .../statefun/e2e/routablekafka/Constants.java      |  38 ------
 .../e2e/routablekafka/FnSelfAddressTagger.java     |  56 --------
 .../flink/statefun/e2e/routablekafka/KafkaIO.java  |  58 ---------
 .../RoutableKafkaVerificationModule.java           |  56 --------
 .../protobuf/routable-kafka-verification.proto     |  41 ------
 .../e2e/routablekafka/RoutableKafkaE2E.java        | 143 ---------------------
 .../src/test/resources/Dockerfile                  |  21 ---
 .../src/test/resources/log4j.properties            |  24 ----
 .../routable-kafka-ingress-module/module.yaml      |  41 ------
 11 files changed, 581 deletions(-)

diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index c6bccd9..41c5683 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -31,7 +31,6 @@ under the License.
     <modules>
         <module>statefun-e2e-tests-common</module>
         <module>statefun-sanity-e2e</module>
-        <module>statefun-routable-kafka-e2e</module>
         <module>statefun-remote-module-e2e</module>
         <module>statefun-exactly-once-e2e</module>
     </modules>
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/pom.xml b/statefun-e2e-tests/statefun-routable-kafka-e2e/pom.xml
deleted file mode 100644
index 9f5797a..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/pom.xml
+++ /dev/null
@@ -1,102 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xmlns="http://maven.apache.org/POM/4.0.0"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>statefun-e2e-tests</artifactId>
-        <groupId>org.apache.flink</groupId>
-        <version>2.0-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>statefun-routable-kafka-e2e</artifactId>
-
-    <properties>
-        <testcontainers.version>1.12.5</testcontainers.version>
-    </properties>
-
-    <dependencies>
-        <!-- Stateful Functions -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-sdk</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-kafka-io</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <!-- Protobuf -->
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>${protobuf.version}</version>
-        </dependency>
-
-        <!-- logging -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.15</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-            <version>1.2.17</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- End-to-end test common -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-e2e-tests-common</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- Testcontainers KafkaContainer -->
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>kafka</artifactId>
-            <version>${testcontainers.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>com.github.os72</groupId>
-                <artifactId>protoc-jar-maven-plugin</artifactId>
-                <version>${protoc-jar-maven-plugin.version}</version>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-javadoc-plugin</artifactId>
-                <configuration>
-                    <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java
deleted file mode 100644
index 6e3c07e..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-
-final class Constants {
-
-  private Constants() {}
-
-  static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
-
-  static final EgressIdentifier<MessageWithAddress> EGRESS_ID =
-      new EgressIdentifier<>(
-          "org.apache.flink.e2e.routablekafka", "tagged-messages", MessageWithAddress.class);
-
-  static final String FUNCTION_NAMESPACE = "org.apache.flink.e2e.routablekafka";
-  static final FunctionType FUNCTION_TYPE_ONE = new FunctionType(FUNCTION_NAMESPACE, "t0");
-  static final FunctionType FUNCTION_TYPE_TWO = new FunctionType(FUNCTION_NAMESPACE, "t1");
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java
deleted file mode 100644
index bec943e..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import com.google.protobuf.Any;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.FnAddress;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.flink.statefun.sdk.Address;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public final class FnSelfAddressTagger implements StatefulFunction {
-
-  @Override
-  public void invoke(Context context, Object input) {
-    MessageWithAddress message = cast(input);
-    context.send(Constants.EGRESS_ID, tagWithSelfAddress(message, context));
-  }
-
-  private static MessageWithAddress cast(Object input) {
-    Any any = (Any) input;
-    try {
-      return any.unpack(MessageWithAddress.class);
-    } catch (Exception e) {
-      throw new RuntimeException("Unable to unpack input as MessageWithAddress.", e);
-    }
-  }
-
-  private MessageWithAddress tagWithSelfAddress(MessageWithAddress original, Context context) {
-    return original.toBuilder().setFrom(fnAddress(context.self())).build();
-  }
-
-  private FnAddress fnAddress(Address sdkAddress) {
-    return FnAddress.newBuilder()
-        .setNamespace(sdkAddress.type().namespace())
-        .setType(sdkAddress.type().name())
-        .setId(sdkAddress.id())
-        .build();
-  }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java
deleted file mode 100644
index 59e3128..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import java.util.Objects;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.flink.statefun.sdk.io.EgressSpec;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-final class KafkaIO {
-
-  static final String TAGGED_MESSAGES_TOPIC_NAME = "tagged-messages";
-
-  private final String kafkaAddress;
-
-  KafkaIO(String kafkaAddress) {
-    this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
-  }
-
-  EgressSpec<MessageWithAddress> getEgressSpec() {
-    return KafkaEgressBuilder.forIdentifier(Constants.EGRESS_ID)
-        .withKafkaAddress(kafkaAddress)
-        .withSerializer(TaggedMessageKafkaSerializer.class)
-        .build();
-  }
-
-  private static final class TaggedMessageKafkaSerializer
-      implements KafkaEgressSerializer<MessageWithAddress> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public ProducerRecord<byte[], byte[]> serialize(MessageWithAddress taggedMessages) {
-      final byte[] key = taggedMessages.getFrom().getIdBytes().toByteArray();
-      final byte[] value = taggedMessages.toByteArray();
-
-      return new ProducerRecord<>(TAGGED_MESSAGES_TOPIC_NAME, key, value);
-    }
-  }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java
deleted file mode 100644
index d9896c8..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-
-/**
- * This is a a simple application used for testing the routable Kafka ingress.
- *
- * <p>The application reads untagged messages from a Kafka ingress, binds multiple functions which
- * has the sole purpose of tagging of messages with their own addresses (see {@link
- * FnSelfAddressTagger}), and then sending back the tagged messages back to a Kafka egress.
- */
-@AutoService(StatefulFunctionModule.class)
-public class RoutableKafkaVerificationModule implements StatefulFunctionModule {
-
-  @Override
-  public void configure(Map<String, String> globalConfiguration, Binder binder) {
-    String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
-    if (kafkaBootstrapServers == null) {
-      throw new IllegalStateException(
-          "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
-    }
-
-    configureKafkaIO(kafkaBootstrapServers, binder);
-    configureAddressTaggerFunctions(binder);
-  }
-
-  private static void configureKafkaIO(String kafkaAddress, Binder binder) {
-    final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
-    binder.bindEgress(kafkaIO.getEgressSpec());
-  }
-
-  private static void configureAddressTaggerFunctions(Binder binder) {
-    binder.bindFunctionProvider(Constants.FUNCTION_TYPE_ONE, ignored -> new FnSelfAddressTagger());
-    binder.bindFunctionProvider(Constants.FUNCTION_TYPE_TWO, ignored -> new FnSelfAddressTagger());
-  }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto
deleted file mode 100644
index a55186b..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = "proto3";
-
-package org.apache.flink.statefun.e2e.routablekafka;
-option java_package = "org.apache.flink.statefun.e2e.routablekafka.generated";
-option java_multiple_files = false;
-
-/*
- * A command is addressed to a specific target funcction and triggers some action by that target.
- * Commands can be nested to an arbitrary depth.
- */
-message MessageWithAddress {
-    FnAddress from = 1;
-    string message = 2;
-}
-
-/*
- * Target function address of commands.
- */
-message FnAddress {
-    string namespace = 1;
-    string type = 2;
-    string id = 3;
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
deleted file mode 100644
index 3869163..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import java.util.Collections;
-import java.util.Properties;
-import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.FnAddress;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
-
-/**
- * End-to-end test based on the {@link RoutableKafkaVerificationModule} application.
- *
- * <p>This test writes some records to Kafka, with target function id as key (UTF8 String) and
- * {@link MessageWithAddress} messages as value, without the {@code from} field set. The routable
- * Kafka ingress should automatically route them to the correct function instances, which tag the
- * input messages with their own address, and then forwards it back to Kafka (see {@link
- * FnSelfAddressTagger} function). The test verifies that the tagged outputs written back to Kafka
- * are correct.
- */
-public class RoutableKafkaE2E {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RoutableKafkaE2E.class);
-
-  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
-  private static final String KAFKA_HOST = "kafka-broker";
-
-  @Rule
-  public KafkaContainer kafka =
-      new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
-
-  @Rule
-  public StatefulFunctionsAppContainers verificationApp =
-      StatefulFunctionsAppContainers.builder("routable-kafka-verification", 1)
-          .dependsOn(kafka)
-          .exposeMasterLogs(LOG)
-          .withBuildContextFileFromClasspath(
-              "routable-kafka-ingress-module", "/routable-kafka-ingress-module/")
-          .withModuleGlobalConfiguration(
-              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
-          .build();
-
-  @Test(timeout = 60_000L)
-  public void run() {
-    final String kafkaAddress = kafka.getBootstrapServers();
-
-    final Producer<String, MessageWithAddress> messageProducer =
-        kafkaKeyedMessagesProducer(kafkaAddress);
-    final Consumer<String, MessageWithAddress> taggedMessageConsumer =
-        kafkaTaggedMessagesConsumer(kafkaAddress);
-
-    final KafkaIOVerifier<String, MessageWithAddress, String, MessageWithAddress> verifier =
-        new KafkaIOVerifier<>(messageProducer, taggedMessageConsumer);
-
-    assertThat(
-        verifier.sending(
-            producerRecord("messages-1", "key-1", message("foo")),
-            producerRecord("messages-1", "key-2", message("bar")),
-            producerRecord("messages-2", "key-1", message("hello"))),
-        verifier.resultsInOrder(
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t0", "key-1"), "foo")),
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t1", "key-1"), "foo")),
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t0", "key-2"), "bar")),
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t1", "key-2"), "bar")),
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t1", "key-1"), "hello"))));
-  }
-
-  private static Producer<String, MessageWithAddress> kafkaKeyedMessagesProducer(
-      String bootstrapServers) {
-    Properties props = new Properties();
-    props.put("bootstrap.servers", bootstrapServers);
-
-    return new KafkaProducer<>(
-        props, new StringSerializer(), new KafkaProtobufSerializer<>(MessageWithAddress.parser()));
-  }
-
-  private Consumer<String, MessageWithAddress> kafkaTaggedMessagesConsumer(
-      String bootstrapServers) {
-    Properties consumerProps = new Properties();
-    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
-    consumerProps.setProperty("group.id", "routable-kafka");
-    consumerProps.setProperty("auto.offset.reset", "earliest");
-
-    KafkaConsumer<String, MessageWithAddress> consumer =
-        new KafkaConsumer<>(
-            consumerProps,
-            new StringDeserializer(),
-            new KafkaProtobufSerializer<>(MessageWithAddress.parser()));
-    consumer.subscribe(Collections.singletonList(KafkaIO.TAGGED_MESSAGES_TOPIC_NAME));
-
-    return consumer;
-  }
-
-  private static ProducerRecord<String, MessageWithAddress> producerRecord(
-      String topic, String key, MessageWithAddress message) {
-    return new ProducerRecord<>(topic, key, message);
-  }
-
-  private static MessageWithAddress message(String message) {
-    return MessageWithAddress.newBuilder().setMessage(message).build();
-  }
-
-  private static MessageWithAddress taggedMessage(FnAddress fromTag, String message) {
-    return MessageWithAddress.newBuilder().setFrom(fromTag).setMessage(message).build();
-  }
-
-  private static FnAddress fnAddress(String namespace, String type, String id) {
-    return FnAddress.newBuilder().setNamespace(namespace).setType(type).setId(id).build();
-  }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile
deleted file mode 100644
index 66d5051..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM flink-statefun:2.0-SNAPSHOT
-
-RUN mkdir -p /opt/statefun/modules/statefun-routable-kafka-e2e
-COPY statefun-routable-kafka-e2e*.jar /opt/statefun/modules/statefun-routable-kafka-e2e/
-COPY routable-kafka-ingress-module/ /opt/statefun/modules/statefun-routable-kafka-e2e/
-COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/log4j.properties
deleted file mode 100644
index fb965d3..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml
deleted file mode 100644
index 98fee69..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-version: "1.0"
-
-module:
-  meta:
-    type: remote
-  spec:
-    ingresses:
-      - ingress:
-          meta:
-            type: statefun.kafka.io/routable-protobuf-ingress
-            id: org.apache.flink.statefun.e2e/messages
-          spec:
-            address: kafka-broker:9092
-            consumerGroupId: routable-kafka-e2e
-            startupPosition:
-              type: earliest
-            topics:
-              - topic: messages-1
-                typeUrl: com.googleapis/org.apache.flink.statefun.e2e.routablekafka.MessageWithAddress
-                targets:
-                  - org.apache.flink.e2e.routablekafka/t0
-                  - org.apache.flink.e2e.routablekafka/t1
-              - topic: messages-2
-                typeUrl: com.googleapis/org.apache.flink.statefun.e2e.routablekafka.MessageWithAddress
-                targets:
-                  - org.apache.flink.e2e.routablekafka/t1


[flink-statefun] 01/08: [FLINK-17518] [python] Make Python SDK build script oblivious of working directory

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit f9d5c7a83f389a5784aecb4e65f104428ecee2ff
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:24:34 2020 +0800

    [FLINK-17518] [python] Make Python SDK build script oblivious of working directory
---
 statefun-python-sdk/build-distribution.sh | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/statefun-python-sdk/build-distribution.sh b/statefun-python-sdk/build-distribution.sh
index e68363e..7f7eb31 100755
--- a/statefun-python-sdk/build-distribution.sh
+++ b/statefun-python-sdk/build-distribution.sh
@@ -1,4 +1,4 @@
-#!/bin/sh
+#!/bin/bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -15,10 +15,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+CURR_DIR=`pwd`
+BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
+
+###########################
+
+cd ${BASE_DIR}
+
 rm -fr dist
 
 docker run \
-	-v "$(pwd):/app" \
+	-v "$BASE_DIR:/app" \
 	--workdir /app \
 	-i  python:3.7-alpine \
 	python3 setup.py sdist bdist_wheel
@@ -26,4 +33,6 @@ docker run \
 rm -fr apache_flink_statefun.egg-info
 rm -fr build
 
+echo "Built Python SDK wheels and packages at ${BASE_DIR}/dist."
 
+cd ${CURR_DIR}