You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/21 07:02:48 UTC

[flink-statefun] branch master updated (1c018e1 -> 6bdbc25)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 1c018e1  [FLINK-17611] [core] Add an ITCase for Unix domain sockets
     new f114019  [FLINK-17518] [python] Make Python SDK build script oblivious of working directory
     new bd06c18  [FLINK-17518] [build] Add a root.dir property to Maven
     new ae1e4ce  [FLINK-17518] [e2e] Build Python SDK with -Prun-e2e-tests
     new 6d204fe  [FLINK-17518] [e2e] Add resultsInAnyOrder to KafkaIOVerifier
     new bae971d  [FLINK-17518] [e2e] Add remote functions for remote module E2E
     new 60d93d6  [FLINK-17518] [e2e] Add remote module E2E
     new 2abbff1  [FLINK-17518] [e2e] Remove routable Kafka E2E test
     new 6bdbc25  [hotfix] [build] Fix path to spotbugs exclude filter file

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |  21 ++-
 statefun-e2e-tests/pom.xml                         |  20 ++-
 .../statefun/e2e/common/kafka/KafkaIOVerifier.java |  53 +++++++
 .../pom.xml                                        |  23 +--
 .../main/protobuf/remote-module-verification.proto |  27 ++--
 .../src/main/python/functions.py                   |  97 ++++++++++++
 .../main/python/remote_module_verification_pb2.py  | 149 ++++++++++++++++++
 .../flink/statefun/e2e/remote/RemoteModuleE2E.java | 167 +++++++++++++++++++++
 .../src/test/resources/Dockerfile                  |   4 +-
 .../src/test/resources/Dockerfile.remote-function  |  10 +-
 .../src/test/resources/log4j.properties            |   0
 .../src/test/resources/remote-module}/module.yaml  |  39 +++--
 .../src/test/resources}/requirements.txt           |   1 -
 .../statefun/e2e/routablekafka/Constants.java      |  38 -----
 .../e2e/routablekafka/FnSelfAddressTagger.java     |  56 -------
 .../flink/statefun/e2e/routablekafka/KafkaIO.java  |  58 -------
 .../RoutableKafkaVerificationModule.java           |  56 -------
 .../protobuf/routable-kafka-verification.proto     |  41 -----
 .../e2e/routablekafka/RoutableKafkaE2E.java        | 143 ------------------
 .../src/test/resources/Dockerfile                  |  21 ---
 statefun-python-sdk/build-distribution.sh          |  13 +-
 21 files changed, 572 insertions(+), 465 deletions(-)
 rename statefun-e2e-tests/{statefun-routable-kafka-e2e => statefun-remote-module-e2e}/pom.xml (86%)
 copy statefun-examples/statefun-python-walkthrough-example/walkthrough.proto => statefun-e2e-tests/statefun-remote-module-e2e/src/main/protobuf/remote-module-verification.proto (74%)
 create mode 100644 statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/functions.py
 create mode 100644 statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/remote_module_verification_pb2.py
 create mode 100644 statefun-e2e-tests/statefun-remote-module-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/RemoteModuleE2E.java
 copy statefun-e2e-tests/{statefun-sanity-e2e => statefun-remote-module-e2e}/src/test/resources/Dockerfile (86%)
 copy statefun-examples/statefun-python-k8s-example/Dockerfile.python-worker => statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile.remote-function (77%)
 rename statefun-e2e-tests/{statefun-routable-kafka-e2e => statefun-remote-module-e2e}/src/test/resources/log4j.properties (100%)
 rename statefun-e2e-tests/{statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module => statefun-remote-module-e2e/src/test/resources/remote-module}/module.yaml (54%)
 copy {statefun-examples/statefun-python-greeter-example/greeter => statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources}/requirements.txt (97%)
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
 delete mode 100644 statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile


[flink-statefun] 03/08: [FLINK-17518] [e2e] Build Python SDK with -Prun-e2e-tests

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit ae1e4ce2811d5feaf16cc7cab0bac9f8a0b9d4e6
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:25:33 2020 +0800

    [FLINK-17518] [e2e] Build Python SDK with -Prun-e2e-tests
---
 statefun-e2e-tests/pom.xml | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 13f2178..5fe37c1 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -63,16 +63,28 @@ under the License.
                         <version>1.6.0</version>
                         <executions>
                             <execution>
+                                <id>build-statefun-base-image</id>
+                                <goals>
+                                    <goal>exec</goal>
+                                </goals>
+                                <inherited>false</inherited>
                                 <phase>pre-integration-test</phase>
+                                <configuration>
+                                    <executable>${root.dir}/tools/docker/build-stateful-functions.sh</executable>
+                                </configuration>
+                            </execution>
+                            <execution>
+                                <id>build-python-sdk-packages</id>
                                 <goals>
                                     <goal>exec</goal>
                                 </goals>
                                 <inherited>false</inherited>
+                                <phase>pre-integration-test</phase>
+                                <configuration>
+                                    <executable>${root.dir}/statefun-python-sdk/build-distribution.sh</executable>
+                                </configuration>
                             </execution>
                         </executions>
-                        <configuration>
-                            <executable>${user.dir}/tools/docker/build-stateful-functions.sh</executable>
-                        </configuration>
                     </plugin>
                     <plugin>
                         <artifactId>maven-failsafe-plugin</artifactId>


[flink-statefun] 07/08: [FLINK-17518] [e2e] Remove routable Kafka E2E test

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 2abbff1b0036fa3099cccdb13330e97078bd4d97
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:32:26 2020 +0800

    [FLINK-17518] [e2e] Remove routable Kafka E2E test
    
    This E2E can be removed since it is completely subsumed by the new
    remote module E2E.
    
    This closes #115.
---
 statefun-e2e-tests/pom.xml                         |   1 -
 .../statefun-routable-kafka-e2e/pom.xml            | 102 ---------------
 .../statefun/e2e/routablekafka/Constants.java      |  38 ------
 .../e2e/routablekafka/FnSelfAddressTagger.java     |  56 --------
 .../flink/statefun/e2e/routablekafka/KafkaIO.java  |  58 ---------
 .../RoutableKafkaVerificationModule.java           |  56 --------
 .../protobuf/routable-kafka-verification.proto     |  41 ------
 .../e2e/routablekafka/RoutableKafkaE2E.java        | 143 ---------------------
 .../src/test/resources/Dockerfile                  |  21 ---
 .../src/test/resources/log4j.properties            |  24 ----
 .../routable-kafka-ingress-module/module.yaml      |  41 ------
 11 files changed, 581 deletions(-)

diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 9456ed1..ebb297e 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -31,7 +31,6 @@ under the License.
     <modules>
         <module>statefun-e2e-tests-common</module>
         <module>statefun-sanity-e2e</module>
-        <module>statefun-routable-kafka-e2e</module>
         <module>statefun-remote-module-e2e</module>
         <module>statefun-exactly-once-e2e</module>
     </modules>
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/pom.xml b/statefun-e2e-tests/statefun-routable-kafka-e2e/pom.xml
deleted file mode 100644
index 7d7d4a4..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/pom.xml
+++ /dev/null
@@ -1,102 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xmlns="http://maven.apache.org/POM/4.0.0"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>statefun-e2e-tests</artifactId>
-        <groupId>org.apache.flink</groupId>
-        <version>2.1-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>statefun-routable-kafka-e2e</artifactId>
-
-    <properties>
-        <testcontainers.version>1.12.5</testcontainers.version>
-    </properties>
-
-    <dependencies>
-        <!-- Stateful Functions -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-sdk</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-kafka-io</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <!-- Protobuf -->
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>${protobuf.version}</version>
-        </dependency>
-
-        <!-- logging -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.15</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-            <version>1.2.17</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- End-to-end test common -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-e2e-tests-common</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- Testcontainers KafkaContainer -->
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>kafka</artifactId>
-            <version>${testcontainers.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>com.github.os72</groupId>
-                <artifactId>protoc-jar-maven-plugin</artifactId>
-                <version>${protoc-jar-maven-plugin.version}</version>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-javadoc-plugin</artifactId>
-                <configuration>
-                    <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java
deleted file mode 100644
index 6e3c07e..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-
-final class Constants {
-
-  private Constants() {}
-
-  static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
-
-  static final EgressIdentifier<MessageWithAddress> EGRESS_ID =
-      new EgressIdentifier<>(
-          "org.apache.flink.e2e.routablekafka", "tagged-messages", MessageWithAddress.class);
-
-  static final String FUNCTION_NAMESPACE = "org.apache.flink.e2e.routablekafka";
-  static final FunctionType FUNCTION_TYPE_ONE = new FunctionType(FUNCTION_NAMESPACE, "t0");
-  static final FunctionType FUNCTION_TYPE_TWO = new FunctionType(FUNCTION_NAMESPACE, "t1");
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java
deleted file mode 100644
index bec943e..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import com.google.protobuf.Any;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.FnAddress;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.flink.statefun.sdk.Address;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public final class FnSelfAddressTagger implements StatefulFunction {
-
-  @Override
-  public void invoke(Context context, Object input) {
-    MessageWithAddress message = cast(input);
-    context.send(Constants.EGRESS_ID, tagWithSelfAddress(message, context));
-  }
-
-  private static MessageWithAddress cast(Object input) {
-    Any any = (Any) input;
-    try {
-      return any.unpack(MessageWithAddress.class);
-    } catch (Exception e) {
-      throw new RuntimeException("Unable to unpack input as MessageWithAddress.", e);
-    }
-  }
-
-  private MessageWithAddress tagWithSelfAddress(MessageWithAddress original, Context context) {
-    return original.toBuilder().setFrom(fnAddress(context.self())).build();
-  }
-
-  private FnAddress fnAddress(Address sdkAddress) {
-    return FnAddress.newBuilder()
-        .setNamespace(sdkAddress.type().namespace())
-        .setType(sdkAddress.type().name())
-        .setId(sdkAddress.id())
-        .build();
-  }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java
deleted file mode 100644
index 59e3128..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import java.util.Objects;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.flink.statefun.sdk.io.EgressSpec;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-final class KafkaIO {
-
-  static final String TAGGED_MESSAGES_TOPIC_NAME = "tagged-messages";
-
-  private final String kafkaAddress;
-
-  KafkaIO(String kafkaAddress) {
-    this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
-  }
-
-  EgressSpec<MessageWithAddress> getEgressSpec() {
-    return KafkaEgressBuilder.forIdentifier(Constants.EGRESS_ID)
-        .withKafkaAddress(kafkaAddress)
-        .withSerializer(TaggedMessageKafkaSerializer.class)
-        .build();
-  }
-
-  private static final class TaggedMessageKafkaSerializer
-      implements KafkaEgressSerializer<MessageWithAddress> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public ProducerRecord<byte[], byte[]> serialize(MessageWithAddress taggedMessages) {
-      final byte[] key = taggedMessages.getFrom().getIdBytes().toByteArray();
-      final byte[] value = taggedMessages.toByteArray();
-
-      return new ProducerRecord<>(TAGGED_MESSAGES_TOPIC_NAME, key, value);
-    }
-  }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java
deleted file mode 100644
index d9896c8..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-
-/**
- * This is a a simple application used for testing the routable Kafka ingress.
- *
- * <p>The application reads untagged messages from a Kafka ingress, binds multiple functions which
- * has the sole purpose of tagging of messages with their own addresses (see {@link
- * FnSelfAddressTagger}), and then sending back the tagged messages back to a Kafka egress.
- */
-@AutoService(StatefulFunctionModule.class)
-public class RoutableKafkaVerificationModule implements StatefulFunctionModule {
-
-  @Override
-  public void configure(Map<String, String> globalConfiguration, Binder binder) {
-    String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
-    if (kafkaBootstrapServers == null) {
-      throw new IllegalStateException(
-          "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
-    }
-
-    configureKafkaIO(kafkaBootstrapServers, binder);
-    configureAddressTaggerFunctions(binder);
-  }
-
-  private static void configureKafkaIO(String kafkaAddress, Binder binder) {
-    final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
-    binder.bindEgress(kafkaIO.getEgressSpec());
-  }
-
-  private static void configureAddressTaggerFunctions(Binder binder) {
-    binder.bindFunctionProvider(Constants.FUNCTION_TYPE_ONE, ignored -> new FnSelfAddressTagger());
-    binder.bindFunctionProvider(Constants.FUNCTION_TYPE_TWO, ignored -> new FnSelfAddressTagger());
-  }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto
deleted file mode 100644
index a55186b..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = "proto3";
-
-package org.apache.flink.statefun.e2e.routablekafka;
-option java_package = "org.apache.flink.statefun.e2e.routablekafka.generated";
-option java_multiple_files = false;
-
-/*
- * A command is addressed to a specific target funcction and triggers some action by that target.
- * Commands can be nested to an arbitrary depth.
- */
-message MessageWithAddress {
-    FnAddress from = 1;
-    string message = 2;
-}
-
-/*
- * Target function address of commands.
- */
-message FnAddress {
-    string namespace = 1;
-    string type = 2;
-    string id = 3;
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
deleted file mode 100644
index 3869163..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import java.util.Collections;
-import java.util.Properties;
-import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.FnAddress;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
-
-/**
- * End-to-end test based on the {@link RoutableKafkaVerificationModule} application.
- *
- * <p>This test writes some records to Kafka, with target function id as key (UTF8 String) and
- * {@link MessageWithAddress} messages as value, without the {@code from} field set. The routable
- * Kafka ingress should automatically route them to the correct function instances, which tag the
- * input messages with their own address, and then forwards it back to Kafka (see {@link
- * FnSelfAddressTagger} function). The test verifies that the tagged outputs written back to Kafka
- * are correct.
- */
-public class RoutableKafkaE2E {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RoutableKafkaE2E.class);
-
-  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
-  private static final String KAFKA_HOST = "kafka-broker";
-
-  @Rule
-  public KafkaContainer kafka =
-      new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
-
-  @Rule
-  public StatefulFunctionsAppContainers verificationApp =
-      StatefulFunctionsAppContainers.builder("routable-kafka-verification", 1)
-          .dependsOn(kafka)
-          .exposeMasterLogs(LOG)
-          .withBuildContextFileFromClasspath(
-              "routable-kafka-ingress-module", "/routable-kafka-ingress-module/")
-          .withModuleGlobalConfiguration(
-              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
-          .build();
-
-  @Test(timeout = 60_000L)
-  public void run() {
-    final String kafkaAddress = kafka.getBootstrapServers();
-
-    final Producer<String, MessageWithAddress> messageProducer =
-        kafkaKeyedMessagesProducer(kafkaAddress);
-    final Consumer<String, MessageWithAddress> taggedMessageConsumer =
-        kafkaTaggedMessagesConsumer(kafkaAddress);
-
-    final KafkaIOVerifier<String, MessageWithAddress, String, MessageWithAddress> verifier =
-        new KafkaIOVerifier<>(messageProducer, taggedMessageConsumer);
-
-    assertThat(
-        verifier.sending(
-            producerRecord("messages-1", "key-1", message("foo")),
-            producerRecord("messages-1", "key-2", message("bar")),
-            producerRecord("messages-2", "key-1", message("hello"))),
-        verifier.resultsInOrder(
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t0", "key-1"), "foo")),
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t1", "key-1"), "foo")),
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t0", "key-2"), "bar")),
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t1", "key-2"), "bar")),
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t1", "key-1"), "hello"))));
-  }
-
-  private static Producer<String, MessageWithAddress> kafkaKeyedMessagesProducer(
-      String bootstrapServers) {
-    Properties props = new Properties();
-    props.put("bootstrap.servers", bootstrapServers);
-
-    return new KafkaProducer<>(
-        props, new StringSerializer(), new KafkaProtobufSerializer<>(MessageWithAddress.parser()));
-  }
-
-  private Consumer<String, MessageWithAddress> kafkaTaggedMessagesConsumer(
-      String bootstrapServers) {
-    Properties consumerProps = new Properties();
-    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
-    consumerProps.setProperty("group.id", "routable-kafka");
-    consumerProps.setProperty("auto.offset.reset", "earliest");
-
-    KafkaConsumer<String, MessageWithAddress> consumer =
-        new KafkaConsumer<>(
-            consumerProps,
-            new StringDeserializer(),
-            new KafkaProtobufSerializer<>(MessageWithAddress.parser()));
-    consumer.subscribe(Collections.singletonList(KafkaIO.TAGGED_MESSAGES_TOPIC_NAME));
-
-    return consumer;
-  }
-
-  private static ProducerRecord<String, MessageWithAddress> producerRecord(
-      String topic, String key, MessageWithAddress message) {
-    return new ProducerRecord<>(topic, key, message);
-  }
-
-  private static MessageWithAddress message(String message) {
-    return MessageWithAddress.newBuilder().setMessage(message).build();
-  }
-
-  private static MessageWithAddress taggedMessage(FnAddress fromTag, String message) {
-    return MessageWithAddress.newBuilder().setFrom(fromTag).setMessage(message).build();
-  }
-
-  private static FnAddress fnAddress(String namespace, String type, String id) {
-    return FnAddress.newBuilder().setNamespace(namespace).setType(type).setId(id).build();
-  }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile
deleted file mode 100644
index 7210f89..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM flink-statefun:2.1-SNAPSHOT
-
-RUN mkdir -p /opt/statefun/modules/statefun-routable-kafka-e2e
-COPY statefun-routable-kafka-e2e*.jar /opt/statefun/modules/statefun-routable-kafka-e2e/
-COPY routable-kafka-ingress-module/ /opt/statefun/modules/statefun-routable-kafka-e2e/
-COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/log4j.properties
deleted file mode 100644
index fb965d3..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml
deleted file mode 100644
index 98fee69..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-version: "1.0"
-
-module:
-  meta:
-    type: remote
-  spec:
-    ingresses:
-      - ingress:
-          meta:
-            type: statefun.kafka.io/routable-protobuf-ingress
-            id: org.apache.flink.statefun.e2e/messages
-          spec:
-            address: kafka-broker:9092
-            consumerGroupId: routable-kafka-e2e
-            startupPosition:
-              type: earliest
-            topics:
-              - topic: messages-1
-                typeUrl: com.googleapis/org.apache.flink.statefun.e2e.routablekafka.MessageWithAddress
-                targets:
-                  - org.apache.flink.e2e.routablekafka/t0
-                  - org.apache.flink.e2e.routablekafka/t1
-              - topic: messages-2
-                typeUrl: com.googleapis/org.apache.flink.statefun.e2e.routablekafka.MessageWithAddress
-                targets:
-                  - org.apache.flink.e2e.routablekafka/t1


[flink-statefun] 01/08: [FLINK-17518] [python] Make Python SDK build script oblivious of working directory

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit f114019de590f82bf3b7e87039f1ba1d635454e9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:24:34 2020 +0800

    [FLINK-17518] [python] Make Python SDK build script oblivious of working directory
---
 statefun-python-sdk/build-distribution.sh | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/statefun-python-sdk/build-distribution.sh b/statefun-python-sdk/build-distribution.sh
index e68363e..7f7eb31 100755
--- a/statefun-python-sdk/build-distribution.sh
+++ b/statefun-python-sdk/build-distribution.sh
@@ -1,4 +1,4 @@
-#!/bin/sh
+#!/bin/bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -15,10 +15,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+CURR_DIR=`pwd`
+BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
+
+###########################
+
+cd ${BASE_DIR}
+
 rm -fr dist
 
 docker run \
-	-v "$(pwd):/app" \
+	-v "$BASE_DIR:/app" \
 	--workdir /app \
 	-i  python:3.7-alpine \
 	python3 setup.py sdist bdist_wheel
@@ -26,4 +33,6 @@ docker run \
 rm -fr apache_flink_statefun.egg-info
 rm -fr build
 
+echo "Built Python SDK wheels and packages at ${BASE_DIR}/dist."
 
+cd ${CURR_DIR}


[flink-statefun] 04/08: [FLINK-17518] [e2e] Add resultsInAnyOrder to KafkaIOVerifier

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 6d204fe6d7d0bb1647d2acab22327fd4e5421e6b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:26:24 2020 +0800

    [FLINK-17518] [e2e] Add resultsInAnyOrder to KafkaIOVerifier
    
    This commit adds an additional kind of matcher that matches Kafka
    outputs in any order that they are consumed.
---
 .../statefun/e2e/common/kafka/KafkaIOVerifier.java | 53 ++++++++++++++++++++++
 1 file changed, 53 insertions(+)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/kafka/KafkaIOVerifier.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/kafka/KafkaIOVerifier.java
index cea79b5..c70dfc6 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/kafka/KafkaIOVerifier.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/kafka/KafkaIOVerifier.java
@@ -19,6 +19,10 @@
 package org.apache.flink.statefun.e2e.common.kafka;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -147,6 +151,44 @@ public final class KafkaIOVerifier<PK, PV, CK, CV> {
     };
   }
 
+  /**
+   * Matcher for verifying the outputs, happening in any order, as a result of calling {@link
+   * #sending(ProducerRecord[])}.
+   *
+   * @param expectedResults matchers for the expected results.
+   * @return a matcher for verifying the output of calling {@link #sending(ProducerRecord[])}.
+   */
+  @SafeVarargs
+  public final Matcher<OutputsHandoff<CV>> resultsInAnyOrder(Matcher<CV>... expectedResults) {
+    return new TypeSafeMatcher<OutputsHandoff<CV>>() {
+      @Override
+      protected boolean matchesSafely(OutputsHandoff<CV> outputHandoff) {
+        final List<Matcher<CV>> expectedResultsList =
+            new ArrayList<>(Arrays.asList(expectedResults));
+
+        try {
+          while (!expectedResultsList.isEmpty()) {
+            CV output = outputHandoff.take();
+            if (!checkAndRemoveIfMatch(expectedResultsList, output)) {
+              return false;
+            }
+          }
+
+          // any dangling unexpected output should count as a mismatch
+          // TODO should we poll with timeout for a stronger verification?
+          return outputHandoff.peek() == null;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        } finally {
+          outputHandoff.verified();
+        }
+      }
+
+      @Override
+      public void describeTo(Description description) {}
+    };
+  }
+
   private static final class OutputsHandoff<T> extends LinkedBlockingQueue<T> {
 
     private static final long serialVersionUID = 1L;
@@ -161,4 +203,15 @@ public final class KafkaIOVerifier<PK, PV, CK, CV> {
       this.isVerified = true;
     }
   }
+
+  private static <CV> boolean checkAndRemoveIfMatch(List<Matcher<CV>> expectedResultsList, CV in) {
+    final Iterator<Matcher<CV>> matchersIterator = expectedResultsList.iterator();
+    while (matchersIterator.hasNext()) {
+      if (matchersIterator.next().matches(in)) {
+        matchersIterator.remove();
+        return true;
+      }
+    }
+    return false;
+  }
 }


[flink-statefun] 06/08: [FLINK-17518] [e2e] Add remote module E2E

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 60d93d682a7e0fd7a3b6d53853b5fccb4e5b7269
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:30:38 2020 +0800

    [FLINK-17518] [e2e] Add remote module E2E
---
 .../flink/statefun/e2e/remote/RemoteModuleE2E.java | 167 +++++++++++++++++++++
 .../src/test/resources/Dockerfile                  |  20 +++
 .../src/test/resources/Dockerfile.remote-function  |  34 +++++
 .../src/test/resources/log4j.properties            |  24 +++
 .../src/test/resources/remote-module/module.yaml   |  60 ++++++++
 .../src/test/resources/requirements.txt            |  21 +++
 6 files changed, 326 insertions(+)

diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/RemoteModuleE2E.java b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/RemoteModuleE2E.java
new file mode 100644
index 0000000..618150b
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/RemoteModuleE2E.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.remote;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
+import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
+import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
+import org.apache.flink.statefun.e2e.remote.generated.RemoteModuleVerification.Invoke;
+import org.apache.flink.statefun.e2e.remote.generated.RemoteModuleVerification.InvokeResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * End-to-end test for a completely YAML-based remote module setup.consisting of
+ *
+ * <p>The setup consists of a auto-routable YAML Kafka ingress, the generic YAML Kafka egress, and
+ * two Python. remote functions: 1) a simple invocation counter function, which gets routed invoke
+ * messages from the auto-routable Kafka ingress, and 2) a simple stateless forwarding. function,
+ * which gets the invocation counts from the counter function and simply forwards them to the Kafka
+ * egress.
+ *
+ * <p>We perform the extra stateless forwarding so that the E2E test scenario covers messaging
+ * between remote functions.
+ */
+public class RemoteModuleE2E {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteModuleE2E.class);
+
+  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+
+  private static final String KAFKA_HOST = "kafka-broker";
+  private static final String INVOKE_TOPIC = "invoke";
+  private static final String INVOKE_RESULTS_TOPIC = "invoke-results";
+
+  private static final String REMOTE_FUNCTION_HOST = "remote-function";
+
+  @Rule
+  public KafkaContainer kafka =
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
+
+  @Rule
+  public GenericContainer<?> remoteFunction =
+      new GenericContainer<>(remoteFunctionImage())
+          .withNetworkAliases(REMOTE_FUNCTION_HOST)
+          .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+  @Rule
+  public StatefulFunctionsAppContainers verificationApp =
+      StatefulFunctionsAppContainers.builder("remote-module-verification", 2)
+          .dependsOn(kafka)
+          .dependsOn(remoteFunction)
+          .exposeMasterLogs(LOG)
+          .withBuildContextFileFromClasspath("remote-module", "/remote-module/")
+          .build();
+
+  @Test
+  public void run() {
+    final String kafkaAddress = kafka.getBootstrapServers();
+
+    final Producer<String, Invoke> invokeProducer = kafkaKeyedInvokesProducer(kafkaAddress);
+    final Consumer<String, InvokeResult> invokeResultConsumer =
+        kafkaInvokeResultsConsumer(kafkaAddress);
+
+    final KafkaIOVerifier<String, Invoke, String, InvokeResult> verifier =
+        new KafkaIOVerifier<>(invokeProducer, invokeResultConsumer);
+
+    // we verify results come in any order, since the results from the counter function are
+    // being forwarded to the forwarding function with a random key, and therefore
+    // might be written to Kafka out-of-order. We specifically use random keys there
+    // so that the E2E may cover both local handovers and cross-partition messaging via the
+    // feedback loop in the remote module setup.
+    assertThat(
+        verifier.sending(invoke("foo"), invoke("foo"), invoke("bar")),
+        verifier.resultsInAnyOrder(
+            is(invokeResult("foo", 1)), is(invokeResult("foo", 2)), is(invokeResult("bar", 1))));
+  }
+
+  private static ImageFromDockerfile remoteFunctionImage() {
+    final Path pythonSourcePath = remoteFunctionPythonSourcePath();
+    LOG.info("Building remote function image with Python source at: {}", pythonSourcePath);
+
+    final Path pythonSdkPath = pythonSdkPath();
+    LOG.info("Located built Python SDK at: {}", pythonSdkPath);
+
+    return new ImageFromDockerfile("remote-function")
+        .withFileFromClasspath("Dockerfile", "Dockerfile.remote-function")
+        .withFileFromPath("source/", pythonSourcePath)
+        .withFileFromClasspath("requirements.txt", "requirements.txt")
+        .withFileFromPath("python-sdk/", pythonSdkPath);
+  }
+
+  private static Path remoteFunctionPythonSourcePath() {
+    return Paths.get(System.getProperty("user.dir") + "/src/main/python");
+  }
+
+  private static Path pythonSdkPath() {
+    return Paths.get(System.getProperty("user.dir") + "/../../statefun-python-sdk/dist");
+  }
+
+  private static Producer<String, Invoke> kafkaKeyedInvokesProducer(String bootstrapServers) {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", bootstrapServers);
+
+    return new KafkaProducer<>(
+        props, new StringSerializer(), new KafkaProtobufSerializer<>(Invoke.parser()));
+  }
+
+  private Consumer<String, InvokeResult> kafkaInvokeResultsConsumer(String bootstrapServers) {
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
+    consumerProps.setProperty("group.id", "remote-module-e2e");
+    consumerProps.setProperty("auto.offset.reset", "earliest");
+
+    KafkaConsumer<String, InvokeResult> consumer =
+        new KafkaConsumer<>(
+            consumerProps,
+            new StringDeserializer(),
+            new KafkaProtobufSerializer<>(InvokeResult.parser()));
+    consumer.subscribe(Collections.singletonList(INVOKE_RESULTS_TOPIC));
+
+    return consumer;
+  }
+
+  private static ProducerRecord<String, Invoke> invoke(String target) {
+    return new ProducerRecord<>(INVOKE_TOPIC, target, Invoke.getDefaultInstance());
+  }
+
+  private static InvokeResult invokeResult(String id, int invokeCount) {
+    return InvokeResult.newBuilder().setId(id).setInvokeCount(invokeCount).build();
+  }
+}
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile
new file mode 100644
index 0000000..e3f8b9d
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM flink-statefun:2.1-SNAPSHOT
+
+RUN mkdir -p /opt/statefun/modules/statefun-remote-module-e2e
+COPY remote-module/ /opt/statefun/modules/statefun-remote-module-e2e/
+COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile.remote-function b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile.remote-function
new file mode 100644
index 0000000..7eb3ffe
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/Dockerfile.remote-function
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM python:3.7-alpine
+
+RUN mkdir -p /app
+WORKDIR /app
+
+COPY python-sdk/apache_flink_statefun-*-py3-none-any.whl /app
+RUN pip install apache_flink_statefun-*-py3-none-any.whl
+
+COPY requirements.txt /app
+RUN pip install -r requirements.txt
+
+COPY source/functions.py /app
+COPY source/remote_module_verification_pb2.py /app
+
+EXPOSE 8000
+
+CMD ["gunicorn", "-b", "0.0.0.0:8000", "-w 1", "functions:app"]
+
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fb965d3
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/remote-module/module.yaml
new file mode 100644
index 0000000..fc1e57c
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/remote-module/module.yaml
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "1.0"
+
+module:
+  meta:
+    type: remote
+  spec:
+    functions:
+      - function:
+          meta:
+            kind: http
+            type: org.apache.flink.statefun.e2e.remote/counter
+          spec:
+            endpoint: http://remote-function:8000/service
+            states:
+              - invoke_count
+            maxNumBatchRequests: 10000
+      - function:
+          meta:
+            kind: http
+            type: org.apache.flink.statefun.e2e.remote/forward-function
+          spec:
+            endpoint: http://remote-function:8000/service
+            maxNumBatchRequests: 10000
+    ingresses:
+      - ingress:
+          meta:
+            type: statefun.kafka.io/routable-protobuf-ingress
+            id: org.apache.flink.statefun.e2e.remote/invoke
+          spec:
+            address: kafka-broker:9092
+            consumerGroupId: remote-module-e2e
+            startupPosition:
+              type: earliest
+            topics:
+              - topic: invoke
+                typeUrl: com.googleapis/org.apache.flink.statefun.e2e.remote.Invoke
+                targets:
+                  - org.apache.flink.statefun.e2e.remote/counter
+    egresses:
+      - egress:
+          meta:
+            type: statefun.kafka.io/generic-egress
+            id: org.apache.flink.statefun.e2e.remote/invoke-results
+          spec:
+            address: kafka-broker:9092
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/requirements.txt b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/requirements.txt
new file mode 100644
index 0000000..2b89e19
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/requirements.txt
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+flask==1.1.1
+gunicorn==20.0.4
+
+
+


[flink-statefun] 08/08: [hotfix] [build] Fix path to spotbugs exclude filter file

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 6bdbc25924b264f85fc42abd34d96eab6c3e26e3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 21 15:00:14 2020 +0800

    [hotfix] [build] Fix path to spotbugs exclude filter file
    
    This commit fixes the path so that it is absolute, instead of a relative
    path to the root directory. This was causing issues, since developers
    essentially could not build modules individually due to an error
    complaining that the exclude file cannot be located (since the path was
    relative to the root, and only works when building from root).
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 5516230..c25c9cf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -219,7 +219,7 @@ under the License.
                 <configuration>
                     <effort>Max</effort>
                     <threshold>Low</threshold>
-                    <excludeFilterFile>tools/maven/spotbugs-exclude.xml</excludeFilterFile>
+                    <excludeFilterFile>${root.dir}/tools/maven/spotbugs-exclude.xml</excludeFilterFile>
                 </configuration>
             </plugin>
 


[flink-statefun] 05/08: [FLINK-17518] [e2e] Add remote functions for remote module E2E

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit bae971d60ec918dba5d68dfdaec02c51fdc55aa4
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:30:02 2020 +0800

    [FLINK-17518] [e2e] Add remote functions for remote module E2E
---
 statefun-e2e-tests/pom.xml                         |   1 +
 .../statefun-remote-module-e2e/pom.xml             | 105 +++++++++++++++
 .../main/protobuf/remote-module-verification.proto |  35 +++++
 .../src/main/python/functions.py                   |  97 ++++++++++++++
 .../main/python/remote_module_verification_pb2.py  | 149 +++++++++++++++++++++
 5 files changed, 387 insertions(+)

diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 5fe37c1..9456ed1 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -32,6 +32,7 @@ under the License.
         <module>statefun-e2e-tests-common</module>
         <module>statefun-sanity-e2e</module>
         <module>statefun-routable-kafka-e2e</module>
+        <module>statefun-remote-module-e2e</module>
         <module>statefun-exactly-once-e2e</module>
     </modules>
 
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/pom.xml b/statefun-e2e-tests/statefun-remote-module-e2e/pom.xml
new file mode 100644
index 0000000..e904370
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>statefun-e2e-tests</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>2.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>statefun-remote-module-e2e</artifactId>
+
+    <properties>
+        <testcontainers.version>1.12.5</testcontainers.version>
+        <kafka.version>2.2.0</kafka.version>
+    </properties>
+
+    <dependencies>
+        <!-- Kafka clients -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Protobuf -->
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+
+        <!-- logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.15</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- End-to-end test common -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-e2e-tests-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Testcontainers KafkaContainer -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.github.os72</groupId>
+                <artifactId>protoc-jar-maven-plugin</artifactId>
+                <version>${protoc-jar-maven-plugin.version}</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <configuration>
+                    <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/main/protobuf/remote-module-verification.proto b/statefun-e2e-tests/statefun-remote-module-e2e/src/main/protobuf/remote-module-verification.proto
new file mode 100644
index 0000000..1cafcf1
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/main/protobuf/remote-module-verification.proto
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.e2e.remote;
+option java_package = "org.apache.flink.statefun.e2e.remote.generated";
+option java_multiple_files = false;
+
+message Invoke {
+}
+
+message InvokeCount {
+    int32 count = 1;
+}
+
+message InvokeResult {
+    string id = 1;
+    int32 invoke_count = 2;
+}
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/functions.py b/statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/functions.py
new file mode 100644
index 0000000..be2abe4
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/functions.py
@@ -0,0 +1,97 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from remote_module_verification_pb2 import Invoke, InvokeResult, InvokeCount
+
+from statefun import StatefulFunctions
+from statefun import RequestReplyHandler
+from statefun import kafka_egress_record
+
+import uuid
+
+functions = StatefulFunctions()
+
+
+@functions.bind("org.apache.flink.statefun.e2e.remote/counter")
+def counter(context, invoke: Invoke):
+    """
+    Keeps count of the number of invocations, and forwards that count
+    to be sent to the Kafka egress. We do the extra forwarding instead
+    of directly sending to Kafka, so that we cover inter-function
+    messaging in our E2E test.
+    """
+    invoke_count = context.state('invoke_count').unpack(InvokeCount)
+    if not invoke_count:
+        invoke_count = InvokeCount()
+        invoke_count.count = 1
+    else:
+        invoke_count.count += 1
+    context.state('invoke_count').pack(invoke_count)
+
+    response = InvokeResult()
+    response.id = context.address.identity
+    response.invoke_count = invoke_count.count
+
+    context.pack_and_send(
+        "org.apache.flink.statefun.e2e.remote/forward-function",
+        # use random keys to simulate both local handovers and
+        # cross-partition messaging via the feedback loop
+        uuid.uuid4().hex,
+        response
+    )
+
+
+@functions.bind("org.apache.flink.statefun.e2e.remote/forward-function")
+def forward_to_egress(context, invoke_result: InvokeResult):
+    """
+    Simply forwards the results to the Kafka egress.
+    """
+    egress_message = kafka_egress_record(
+        topic="invoke-results",
+        key=invoke_result.id,
+        value=invoke_result
+    )
+    context.pack_and_send_egress(
+        "org.apache.flink.statefun.e2e.remote/invoke-results",
+        egress_message
+    )
+
+
+handler = RequestReplyHandler(functions)
+
+#
+# Serve the endpoint
+#
+
+from flask import request
+from flask import make_response
+from flask import Flask
+
+app = Flask(__name__)
+
+
+@app.route('/service', methods=['POST'])
+def handle():
+    response_data = handler(request.data)
+    response = make_response(response_data)
+    response.headers.set('Content-Type', 'application/octet-stream')
+    return response
+
+
+if __name__ == "__main__":
+    app.run()
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/remote_module_verification_pb2.py b/statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/remote_module_verification_pb2.py
new file mode 100644
index 0000000..8e0a556
--- /dev/null
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/main/python/remote_module_verification_pb2.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: remote-module-verification.proto
+
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+  name='remote-module-verification.proto',
+  package='org.apache.flink.statefun.e2e.remote',
+  syntax='proto3',
+  serialized_options=_b('\n.org.apache.flink.statefun.e2e.remote.generatedP\000'),
+  serialized_pb=_b('\n remote-module-verification.proto\x12$org.apache.flink.statefun.e2e.remote\"\x08\n\x06Invoke\"\x1c\n\x0bInvokeCount\x12\r\n\x05\x63ount\x18\x01 \x01(\x05\"0\n\x0cInvokeResult\x12\n\n\x02id\x18\x01 \x01(\t\x12\x14\n\x0cinvoke_count\x18\x02 \x01(\x05\x42\x32\n.org.apache.flink.statefun.e2e.remote.generatedP\x00\x62\x06proto3')
+)
+
+
+
+
+_INVOKE = _descriptor.Descriptor(
+  name='Invoke',
+  full_name='org.apache.flink.statefun.e2e.remote.Invoke',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=74,
+  serialized_end=82,
+)
+
+
+_INVOKECOUNT = _descriptor.Descriptor(
+  name='InvokeCount',
+  full_name='org.apache.flink.statefun.e2e.remote.InvokeCount',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='count', full_name='org.apache.flink.statefun.e2e.remote.InvokeCount.count', index=0,
+      number=1, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=84,
+  serialized_end=112,
+)
+
+
+_INVOKERESULT = _descriptor.Descriptor(
+  name='InvokeResult',
+  full_name='org.apache.flink.statefun.e2e.remote.InvokeResult',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='id', full_name='org.apache.flink.statefun.e2e.remote.InvokeResult.id', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='invoke_count', full_name='org.apache.flink.statefun.e2e.remote.InvokeResult.invoke_count', index=1,
+      number=2, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=114,
+  serialized_end=162,
+)
+
+DESCRIPTOR.message_types_by_name['Invoke'] = _INVOKE
+DESCRIPTOR.message_types_by_name['InvokeCount'] = _INVOKECOUNT
+DESCRIPTOR.message_types_by_name['InvokeResult'] = _INVOKERESULT
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+Invoke = _reflection.GeneratedProtocolMessageType('Invoke', (_message.Message,), dict(
+  DESCRIPTOR = _INVOKE,
+  __module__ = 'remote_module_verification_pb2'
+  # @@protoc_insertion_point(class_scope:org.apache.flink.statefun.e2e.remote.Invoke)
+  ))
+_sym_db.RegisterMessage(Invoke)
+
+InvokeCount = _reflection.GeneratedProtocolMessageType('InvokeCount', (_message.Message,), dict(
+  DESCRIPTOR = _INVOKECOUNT,
+  __module__ = 'remote_module_verification_pb2'
+  # @@protoc_insertion_point(class_scope:org.apache.flink.statefun.e2e.remote.InvokeCount)
+  ))
+_sym_db.RegisterMessage(InvokeCount)
+
+InvokeResult = _reflection.GeneratedProtocolMessageType('InvokeResult', (_message.Message,), dict(
+  DESCRIPTOR = _INVOKERESULT,
+  __module__ = 'remote_module_verification_pb2'
+  # @@protoc_insertion_point(class_scope:org.apache.flink.statefun.e2e.remote.InvokeResult)
+  ))
+_sym_db.RegisterMessage(InvokeResult)
+
+
+DESCRIPTOR._options = None
+# @@protoc_insertion_point(module_scope)


[flink-statefun] 02/08: [FLINK-17518] [build] Add a root.dir property to Maven

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit bd06c18f7305ae494d550355add9937b1db5ba77
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 21 12:58:20 2020 +0800

    [FLINK-17518] [build] Add a root.dir property to Maven
    
    Maven, by default, doesn't have a property that points to the root
    parent module's directory. We work around that by using the
    directory-maven-plugin to define that.
---
 pom.xml | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/pom.xml b/pom.xml
index e1f3668..5516230 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@ under the License.
         <unixsocket.version>2.3.2</unixsocket.version>
         <protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version>
         <flink.version>1.10.1</flink.version>
+        <root.dir>${rootDir}</root.dir>
     </properties>
 
     <dependencies>
@@ -313,6 +314,24 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
+
+            <plugin>
+                <groupId>org.commonjava.maven.plugins</groupId>
+                <artifactId>directory-maven-plugin</artifactId>
+                <version>0.1</version>
+                <executions>
+                    <execution>
+                        <id>directories</id>
+                        <goals>
+                            <goal>highest-basedir</goal>
+                        </goals>
+                        <phase>initialize</phase>
+                        <configuration>
+                            <property>rootDir</property>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>