You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/15 11:08:51 UTC

[flink-statefun] branch master updated (4708701 -> 50b4d1c)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 4708701  [FLINK-17684] [e2e] Enable E2E tests in Travis builds
     new c2a2648  [FLINK-17516] [e2e] Refactor StatefulFunctionsAppContainers to use Builder pattern
     new 5244bb6  [FLINK-17516] [e2e] Expose master REST port in StatefulFunctionsAppContainers
     new 84afed9  [FLINK-17516] [e2e] Enable exactly-once checkpointing in all e2e tests
     new 60f4bd7  [FLINK-17516] [e2e] Bind checkpoint dirs to temporary local directory
     new e518ea0  [FLINK-17516] [e2e] Allow restarting workers with StatefulFunctionsAppContainers
     new c57ca3e  [FLINK-17516] [e2e] Add verification app for exactly-once E2E
     new bd30ec0  [FLINK-17516] [e2e] Implement exactly-once E2E against ExactlyOnceVerificationModule
     new ea14e2c  [FLINK-17712] [build] Upgrade Flink version to 1.10.1
     new d146a90  [FLINK-16928] [core] Remove legacy scheduler config validation
     new cabe2ea  [FLINK-16928] Remove jobmanager.scheduler from all config files and docs
     new 3927424  [FLINK-17533] Add UnboundedFeedbackLoggerFactory
     new b3f6b84  [FLINK-17533] Expose the UnboundedFeedbackLoggerFactory
     new 4aac946  [FLINK-17533] Expose checkpointId from barrier messages
     new 12626a5  [FLINK-17533] Add support for multiple concurrent checkpoints
     new 6425f42  [FLINK-17533] Remove concurrent checkpoints limitation
     new c49de86  [hotfix] Remove unused code
     new 50b4d1c  [hotfix] Avoid raw types in UnboundedFeedbackLoggerTest

The 17 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/deployment-and-operations/packaging.md        |   2 -
 pom.xml                                            |   2 +-
 statefun-e2e-tests/pom.xml                         |   1 +
 .../e2e/common/StatefulFunctionsAppContainers.java | 363 ++++++++++++---------
 .../src/main/resources/flink-conf.yaml             |   3 +-
 .../pom.xml                                        |  17 +-
 .../flink/statefun/e2e/exactlyonce}/Constants.java |  19 +-
 .../ExactlyOnceVerificationModule.java}            |  25 +-
 .../flink/statefun/e2e/exactlyonce/FnCounter.java  |  30 +-
 .../statefun/e2e/exactlyonce/FnUnwrapper.java      |  21 +-
 .../flink/statefun/e2e/exactlyonce}/KafkaIO.java   |  50 +--
 .../main/protobuf/exactly-once-verification.proto} |  22 +-
 .../statefun/e2e/exactlyonce/ExactlyOnceE2E.java   | 155 +++++++++
 .../src/test/resources/Dockerfile                  |   4 +-
 .../src/test/resources/log4j.properties            |   0
 .../e2e/routablekafka/RoutableKafkaE2E.java        |   5 +-
 .../statefun/e2e/sanity/SanityVerificationE2E.java |   5 +-
 .../core/StatefulFunctionsConfigValidator.java     |  22 --
 .../flink/core/common/SerializablePredicate.java   |  23 --
 .../statefun/flink/core/feedback/Checkpoints.java  |  61 ++++
 .../flink/core/feedback/FeedbackUnionOperator.java |  34 +-
 .../feedback/FeedbackUnionOperatorFactory.java     |   6 +-
 .../core/functions/AsyncMessageDecorator.java      |   5 +-
 ...edStreamOperations.java => FeedbackLogger.java} |  14 +-
 .../flink/statefun/flink/core/logger/Loggers.java  |   7 +-
 .../flink/core/logger/UnboundedFeedbackLogger.java |  16 +-
 ...ry.java => UnboundedFeedbackLoggerFactory.java} |  30 +-
 .../flink/statefun/flink/core/message/Message.java |  11 +-
 .../flink/core/message/ProtobufMessage.java        |   9 +-
 .../statefun/flink/core/message/SdkMessage.java    |   5 +-
 .../flink/core/translation/FlinkUniverse.java      |   7 +-
 .../flink/core/StatefulFunctionsConfigTest.java    |  12 -
 .../flink/core/feedback/CheckpointsTest.java       | 143 ++++++++
 .../core/logger/UnboundedFeedbackLoggerTest.java   |   6 +-
 tools/docker/Dockerfile                            |   2 +-
 .../conf/flink-conf.yaml                           |   2 -
 tools/k8s/templates/config-map.yaml                |   1 -
 37 files changed, 781 insertions(+), 359 deletions(-)
 copy statefun-e2e-tests/{statefun-sanity-e2e => statefun-exactly-once-e2e}/pom.xml (94%)
 copy statefun-e2e-tests/{statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka => statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce}/Constants.java (61%)
 copy statefun-e2e-tests/{statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java => statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java} (60%)
 copy statefun-examples/statefun-flink-harness-example/src/main/java/org/apache/flink/statefun/examples/harness/MyFunction.java => statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java (51%)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/grpcfn/GrpcFunction.java => statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java (61%)
 copy statefun-e2e-tests/{statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity => statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce}/KafkaIO.java (54%)
 copy statefun-e2e-tests/{statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto => statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto} (69%)
 create mode 100644 statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java
 copy statefun-e2e-tests/{statefun-sanity-e2e => statefun-exactly-once-e2e}/src/test/resources/Dockerfile (85%)
 copy statefun-e2e-tests/{statefun-sanity-e2e => statefun-exactly-once-e2e}/src/test/resources/log4j.properties (100%)
 delete mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/SerializablePredicate.java
 create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/{CheckpointedStreamOperations.java => FeedbackLogger.java} (72%)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/{KeyGroupStreamFactory.java => UnboundedFeedbackLoggerFactory.java} (55%)
 create mode 100644 statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java


[flink-statefun] 05/17: [FLINK-17516] [e2e] Allow restarting workers with StatefulFunctionsAppContainers

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit e518ea0717dc54b9e4d6af96aa1839df38f3fa1b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 14 12:50:35 2020 +0800

    [FLINK-17516] [e2e] Allow restarting workers with StatefulFunctionsAppContainers
---
 .../e2e/common/StatefulFunctionsAppContainers.java       | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index 9894c2d..f03a5de 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -168,6 +168,22 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     return master.getMappedPort(8081);
   }
 
+  /**
+   * Restarts a single worker of this Stateful Functions application.
+   *
+   * @param workerIndex the index of the worker to restart.
+   */
+  public void restartWorker(int workerIndex) {
+    if (workerIndex >= workers.size()) {
+      throw new IndexOutOfBoundsException(
+          "Invalid worker index; valid values are 0 to " + (workers.size() - 1));
+    }
+
+    final GenericContainer<?> worker = workers.get(workerIndex);
+    worker.stop();
+    worker.start();
+  }
+
   private static File temporaryCheckpointDir() throws IOException {
     final Path currentWorkingDir = Paths.get(System.getProperty("user.dir"));
     return Files.createTempDirectory(currentWorkingDir, "statefun-app-checkpoints-").toFile();


[flink-statefun] 16/17: [hotfix] Remove unused code

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit c49de86e946e1b1d46aa36cce22f61fa2e638198
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 23:18:33 2020 +0200

    [hotfix] Remove unused code
---
 .../flink/core/common/SerializablePredicate.java   | 23 ----------------------
 .../flink/statefun/flink/core/logger/Loggers.java  | 14 -------------
 2 files changed, 37 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/SerializablePredicate.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/SerializablePredicate.java
deleted file mode 100644
index 05e6c48..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/SerializablePredicate.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.flink.core.common;
-
-import java.io.Serializable;
-import java.util.function.Predicate;
-
-public interface SerializablePredicate<T> extends Predicate<T>, Serializable {}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
index 948a808..aee7536 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
@@ -36,20 +36,6 @@ import org.apache.flink.util.ResourceGuard.Lease;
 public final class Loggers {
   private Loggers() {}
 
-  public static UnboundedFeedbackLogger<?> unboundedSpillableLogger(
-      IOManager ioManager,
-      int maxParallelism,
-      long inMemoryMaxBufferSize,
-      TypeSerializer<?> serializer,
-      Function<?, ?> keySelector) {
-
-    UnboundedFeedbackLoggerFactory<?> factory =
-        unboundedSpillableLoggerFactory(
-            ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector);
-
-    return factory.create();
-  }
-
   public static UnboundedFeedbackLoggerFactory<?> unboundedSpillableLoggerFactory(
       IOManager ioManager,
       int maxParallelism,


[flink-statefun] 06/17: [FLINK-17516] [e2e] Add verification app for exactly-once E2E

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit c57ca3e8ba6f18c796e5db33150706e8462574a3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri May 15 18:33:05 2020 +0800

    [FLINK-17516] [e2e] Add verification app for exactly-once E2E
---
 statefun-e2e-tests/pom.xml                         |   1 +
 .../statefun-exactly-once-e2e/pom.xml              | 109 +++++++++++++++++++++
 .../flink/statefun/e2e/exactlyonce/Constants.java  |  39 ++++++++
 .../exactlyonce/ExactlyOnceVerificationModule.java |  65 ++++++++++++
 .../flink/statefun/e2e/exactlyonce/FnCounter.java  |  47 +++++++++
 .../statefun/e2e/exactlyonce/FnUnwrapper.java      |  40 ++++++++
 .../flink/statefun/e2e/exactlyonce/KafkaIO.java    |  92 +++++++++++++++++
 .../main/protobuf/exactly-once-verification.proto  |  39 ++++++++
 8 files changed, 432 insertions(+)

diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index e77429a..13f2178 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -32,6 +32,7 @@ under the License.
         <module>statefun-e2e-tests-common</module>
         <module>statefun-sanity-e2e</module>
         <module>statefun-routable-kafka-e2e</module>
+        <module>statefun-exactly-once-e2e</module>
     </modules>
 
     <build>
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml b/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml
new file mode 100644
index 0000000..0ea4d6a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>statefun-e2e-tests</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>2.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>statefun-exactly-once-e2e</artifactId>
+
+    <properties>
+        <testcontainers.version>1.12.5</testcontainers.version>
+    </properties>
+
+    <dependencies>
+        <!-- Stateful Functions -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-kafka-io</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Protobuf -->
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+
+        <!-- logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.15</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>3.14.6</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- End-to-end test common -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-e2e-tests-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Testcontainers KafkaContainer -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.github.os72</groupId>
+                <artifactId>protoc-jar-maven-plugin</artifactId>
+                <version>${protoc-jar-maven-plugin.version}</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <configuration>
+                    <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java
new file mode 100644
index 0000000..ab66809
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+
+final class Constants {
+
+  private Constants() {}
+
+  static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
+
+  static final IngressIdentifier<WrappedMessage> INGRESS_ID =
+      new IngressIdentifier<>(
+          WrappedMessage.class, "org.apache.flink.e2e.exactlyonce", "wrapped-messages");
+
+  static final EgressIdentifier<InvokeCount> EGRESS_ID =
+      new EgressIdentifier<>(
+          "org.apache.flink.e2e.exactlyonce", "invoke-counts", InvokeCount.class);
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java
new file mode 100644
index 0000000..9dbb465
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+/**
+ * This is a a simple application used for testing end-to-end exactly-once semantics.
+ *
+ * <p>The application reads {@link WrappedMessage}s from a Kafka ingress which gets routed to {@link
+ * FnUnwrapper} functions, which in turn simply forwards the messages to {@link FnCounter} functions
+ * with specified target keys defined in the wrapped message. The counter function keeps count of
+ * the number of times each key as been invoked, and sinks that count to an exactly-once delivery
+ * Kafka egress for verification.
+ */
+@AutoService(StatefulFunctionModule.class)
+public class ExactlyOnceVerificationModule implements StatefulFunctionModule {
+
+  @Override
+  public void configure(Map<String, String> globalConfiguration, Binder binder) {
+    String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
+    if (kafkaBootstrapServers == null) {
+      throw new IllegalStateException(
+          "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
+    }
+
+    configureKafkaIO(kafkaBootstrapServers, binder);
+    configureAddressTaggerFunctions(binder);
+  }
+
+  private static void configureKafkaIO(String kafkaAddress, Binder binder) {
+    final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
+
+    binder.bindIngress(kafkaIO.getIngressSpec());
+    binder.bindIngressRouter(
+        Constants.INGRESS_ID,
+        ((message, downstream) -> downstream.forward(FnUnwrapper.TYPE, message.getKey(), message)));
+
+    binder.bindEgress(kafkaIO.getEgressSpec());
+  }
+
+  private static void configureAddressTaggerFunctions(Binder binder) {
+    binder.bindFunctionProvider(FnUnwrapper.TYPE, ignored -> new FnUnwrapper());
+    binder.bindFunctionProvider(FnCounter.TYPE, ignored -> new FnCounter());
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java
new file mode 100644
index 0000000..5243ebd
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+final class FnCounter implements StatefulFunction {
+
+  static final FunctionType TYPE = new FunctionType("org.apache.flink.e2e.exactlyonce", "counter");
+
+  @Persisted
+  private final PersistedValue<Integer> invokeCountState =
+      PersistedValue.of("invoke-count", Integer.class);
+
+  @Override
+  public void invoke(Context context, Object input) {
+    final int previousCount = invokeCountState.getOrDefault(0);
+    final int currentCount = previousCount + 1;
+
+    final InvokeCount invokeCount =
+        InvokeCount.newBuilder().setId(context.self().id()).setInvokeCount(currentCount).build();
+    invokeCountState.set(currentCount);
+
+    context.send(Constants.EGRESS_ID, invokeCount);
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java
new file mode 100644
index 0000000..990e545
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+
+final class FnUnwrapper implements StatefulFunction {
+
+  static final FunctionType TYPE =
+      new FunctionType("org.apache.flink.e2e.exactlyonce", "unwrapper");
+
+  @Override
+  public void invoke(Context context, Object input) {
+    final WrappedMessage message = requireWrappedMessage(input);
+    context.send(FnCounter.TYPE, message.getInvokeTargetId(), message);
+  }
+
+  private static WrappedMessage requireWrappedMessage(Object input) {
+    return (WrappedMessage) input;
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java
new file mode 100644
index 0000000..4df60ca
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+final class KafkaIO {
+
+  static final String WRAPPED_MESSAGES_TOPIC_NAME = "wrapped-messages";
+  static final String INVOKE_COUNTS_TOPIC_NAME = "invoke-counts";
+
+  private final String kafkaAddress;
+
+  KafkaIO(String kafkaAddress) {
+    this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
+  }
+
+  IngressSpec<WrappedMessage> getIngressSpec() {
+    return KafkaIngressBuilder.forIdentifier(Constants.INGRESS_ID)
+        .withTopic(KafkaIO.WRAPPED_MESSAGES_TOPIC_NAME)
+        .withKafkaAddress(kafkaAddress)
+        .withStartupPosition(KafkaIngressStartupPosition.fromEarliest())
+        .withConsumerGroupId("exactly-once-e2e")
+        .withDeserializer(WrappedMessageKafkaDeserializer.class)
+        .build();
+  }
+
+  EgressSpec<InvokeCount> getEgressSpec() {
+    return KafkaEgressBuilder.forIdentifier(Constants.EGRESS_ID)
+        .withKafkaAddress(kafkaAddress)
+        .withExactlyOnceProducerSemantics(Duration.ofMinutes(1))
+        .withSerializer(InvokeCountKafkaSerializer.class)
+        .build();
+  }
+
+  private static final class WrappedMessageKafkaDeserializer
+      implements KafkaIngressDeserializer<WrappedMessage> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public WrappedMessage deserialize(ConsumerRecord<byte[], byte[]> input) {
+      try {
+        return WrappedMessage.parseFrom(input.value());
+      } catch (Exception e) {
+        throw new RuntimeException("Error deserializing messages", e);
+      }
+    }
+  }
+
+  private static final class InvokeCountKafkaSerializer
+      implements KafkaEgressSerializer<InvokeCount> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serialize(InvokeCount invokeCount) {
+      final byte[] key = invokeCount.getIdBytes().toByteArray();
+      final byte[] value = invokeCount.toByteArray();
+
+      return new ProducerRecord<>(INVOKE_COUNTS_TOPIC_NAME, key, value);
+    }
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto
new file mode 100644
index 0000000..5e8b41a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+option java_package = "org.apache.flink.statefun.e2e.exactlyonce.generated";
+option java_multiple_files = false;
+
+message WrappedMessage {
+    string invokeTargetId = 1;
+    string key = 2;
+}
+
+message FnAddress {
+    string namespace = 1;
+    string type = 2;
+    string id = 3;
+}
+
+message InvokeCount {
+    string id = 1;
+    int32 invokeCount = 2;
+}


[flink-statefun] 01/17: [FLINK-17516] [e2e] Refactor StatefulFunctionsAppContainers to use Builder pattern

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit c2a26488f53e765dd37a5f1326a2b2847c0960b5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue May 12 22:57:03 2020 +0800

    [FLINK-17516] [e2e] Refactor StatefulFunctionsAppContainers to use Builder pattern
---
 .../e2e/common/StatefulFunctionsAppContainers.java | 328 +++++++++++----------
 .../e2e/routablekafka/RoutableKafkaE2E.java        |   5 +-
 .../statefun/e2e/sanity/SanityVerificationE2E.java |   5 +-
 3 files changed, 183 insertions(+), 155 deletions(-)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index f648fcd..9a3f2a3 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -55,7 +55,7 @@ import org.testcontainers.images.builder.ImageFromDockerfile;
  *
  *     {@code @Rule}
  *     public StatefulFunctionsAppContainers myApp =
- *         new StatefulFunctionsAppContainers("app-name", 3);
+ *         StatefulFunctionsAppContainers.builder("app-name", 3).build();
  *
  *     {@code @Test}
  *     public void runTest() {
@@ -77,15 +77,16 @@ import org.testcontainers.images.builder.ImageFromDockerfile;
  *
  *     {@code @Rule}
  *     public StatefulFunctionsAppContainers myApp =
- *         new StatefulFunctionsAppContainers("app-name", 3)
- *             .dependsOn(kafka);
+ *         StatefulFunctionsAppContainers.builder("app-name", 3)
+ *             .dependsOn(kafka)
+ *             .build();
  *
  *     ...
  * }
  * }</pre>
  *
  * <p>Application master and worker containers will always be started after containers that are
- * added using {@link #dependsOn(GenericContainer)} have started. Moreover, containers being
+ * added using {@link Builder#dependsOn(GenericContainer)} have started. Moreover, containers being
  * depended on will also be setup such that they share the same network with the master and workers,
  * so that they can freely communicate with each other.
  *
@@ -114,70 +115,28 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
 
   private static final Logger LOG = LoggerFactory.getLogger(StatefulFunctionsAppContainers.class);
 
-  private static final String MASTER_HOST = "statefun-app-master";
-  private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
-
-  private final String appName;
-  private final int numWorkers;
-  private final Network network;
-
-  private final Configuration dynamicProperties = new Configuration();
-  private final List<GenericContainer<?>> dependentContainers = new ArrayList<>();
-  private final List<ClasspathBuildContextFile> classpathBuildContextFiles = new ArrayList<>();
-  private Logger masterLogger;
-
   private GenericContainer<?> master;
   private List<GenericContainer<?>> workers;
 
-  public StatefulFunctionsAppContainers(String appName, int numWorkers) {
-    if (appName == null || appName.isEmpty()) {
-      throw new IllegalArgumentException(
-          "App name must be non-empty. This is used as the application image name.");
-    }
-    if (numWorkers < 1) {
-      throw new IllegalArgumentException("Must have at least 1 worker.");
-    }
-
-    this.network = Network.newNetwork();
-    this.appName = appName;
-    this.numWorkers = numWorkers;
-  }
-
-  public StatefulFunctionsAppContainers dependsOn(GenericContainer<?> container) {
-    container.withNetwork(network);
-    this.dependentContainers.add(container);
-    return this;
-  }
-
-  public StatefulFunctionsAppContainers exposeMasterLogs(Logger logger) {
-    this.masterLogger = logger;
-    return this;
+  private StatefulFunctionsAppContainers(
+      GenericContainer<?> masterContainer, List<GenericContainer<?>> workerContainers) {
+    this.master = Objects.requireNonNull(masterContainer);
+    this.workers = Objects.requireNonNull(workerContainers);
   }
 
-  public StatefulFunctionsAppContainers withModuleGlobalConfiguration(String key, String value) {
-    this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + key, value);
-    return this;
-  }
-
-  public <T> StatefulFunctionsAppContainers withConfiguration(ConfigOption<T> config, T value) {
-    this.dynamicProperties.set(config, value);
-    return this;
-  }
-
-  public StatefulFunctionsAppContainers withBuildContextFileFromClasspath(
-      String buildContextPath, String resourcePath) {
-    this.classpathBuildContextFiles.add(
-        new ClasspathBuildContextFile(buildContextPath, resourcePath));
-    return this;
+  /**
+   * Creates a builder for creating a {@link StatefulFunctionsAppContainers}.
+   *
+   * @param appName the name of the application.
+   * @param numWorkers the number of workers to run the application.
+   * @return a builder for creating a {@link StatefulFunctionsAppContainers}.
+   */
+  public static Builder builder(String appName, int numWorkers) {
+    return new Builder(appName, numWorkers);
   }
 
   @Override
   protected void before() throws Throwable {
-    final ImageFromDockerfile appImage =
-        appImage(appName, dynamicProperties, classpathBuildContextFiles);
-    this.master = masterContainer(appImage, network, dependentContainers, numWorkers, masterLogger);
-    this.workers = workerContainers(appImage, numWorkers, network);
-
     master.start();
     workers.forEach(GenericContainer::start);
   }
@@ -188,122 +147,189 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     workers.forEach(GenericContainer::stop);
   }
 
-  private static ImageFromDockerfile appImage(
-      String appName,
-      Configuration dynamicProperties,
-      List<ClasspathBuildContextFile> classpathBuildContextFiles) {
-    final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
-    LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
-
-    final ImageFromDockerfile appImage =
-        new ImageFromDockerfile(appName)
-            .withFileFromClasspath("Dockerfile", "Dockerfile")
-            .withFileFromPath(".", targetDirPath);
-
-    Configuration flinkConf = resolveFlinkConf(dynamicProperties);
-    String flinkConfString = flinkConfigAsString(flinkConf);
-    LOG.info(
-        "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
-        flinkConf);
-    appImage.withFileFromString("flink-conf.yaml", flinkConfString);
-
-    for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) {
-      appImage.withFileFromClasspath(
-          classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath);
+  public static final class Builder {
+    private static final String MASTER_HOST = "statefun-app-master";
+    private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
+
+    private final String appName;
+    private final int numWorkers;
+    private final Network network;
+
+    private final Configuration dynamicProperties = new Configuration();
+    private final List<GenericContainer<?>> dependentContainers = new ArrayList<>();
+    private final List<ClasspathBuildContextFile> classpathBuildContextFiles = new ArrayList<>();
+    private Logger masterLogger;
+
+    private Builder(String appName, int numWorkers) {
+      if (appName == null || appName.isEmpty()) {
+        throw new IllegalArgumentException(
+            "App name must be non-empty. This is used as the application image name.");
+      }
+      if (numWorkers < 1) {
+        throw new IllegalArgumentException("Must have at least 1 worker.");
+      }
+
+      this.network = Network.newNetwork();
+      this.appName = appName;
+      this.numWorkers = numWorkers;
     }
 
-    return appImage;
-  }
+    public StatefulFunctionsAppContainers.Builder dependsOn(GenericContainer<?> container) {
+      container.withNetwork(network);
+      this.dependentContainers.add(container);
+      return this;
+    }
 
-  /**
-   * Merges set dynamic properties with configuration in the base flink-conf.yaml located in
-   * resources.
-   */
-  private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
-    final InputStream baseFlinkConfResourceInputStream =
-        StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
-    if (baseFlinkConfResourceInputStream == null) {
-      throw new RuntimeException("Base flink-conf.yaml cannot be found.");
+    public StatefulFunctionsAppContainers.Builder exposeMasterLogs(Logger logger) {
+      this.masterLogger = logger;
+      return this;
     }
 
-    final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
-    return GlobalConfiguration.loadConfiguration(
-        tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
-  }
+    public StatefulFunctionsAppContainers.Builder withModuleGlobalConfiguration(
+        String key, String value) {
+      this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + key, value);
+      return this;
+    }
 
-  private static String flinkConfigAsString(Configuration configuration) {
-    StringBuilder yaml = new StringBuilder();
-    for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
-      yaml.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
+    public <T> StatefulFunctionsAppContainers.Builder withConfiguration(
+        ConfigOption<T> config, T value) {
+      this.dynamicProperties.set(config, value);
+      return this;
     }
 
-    return yaml.toString();
-  }
+    public StatefulFunctionsAppContainers.Builder withBuildContextFileFromClasspath(
+        String buildContextPath, String resourcePath) {
+      this.classpathBuildContextFiles.add(
+          new ClasspathBuildContextFile(buildContextPath, resourcePath));
+      return this;
+    }
 
-  private static File copyToTempFlinkConfFile(InputStream inputStream) {
-    try {
-      final File tempFile =
-          new File(
-              Files.createTempDirectory("statefun-app-containers").toString(), "flink-conf.yaml");
-      Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
-      return tempFile;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+    public StatefulFunctionsAppContainers build() {
+      final ImageFromDockerfile appImage =
+          appImage(appName, dynamicProperties, classpathBuildContextFiles);
+
+      return new StatefulFunctionsAppContainers(
+          masterContainer(appImage, network, dependentContainers, numWorkers, masterLogger),
+          workerContainers(appImage, numWorkers, network));
     }
-  }
 
-  private static GenericContainer<?> masterContainer(
-      ImageFromDockerfile appImage,
-      Network network,
-      List<GenericContainer<?>> dependents,
-      int numWorkers,
-      @Nullable Logger masterLogger) {
-    final GenericContainer<?> master =
-        new GenericContainer(appImage)
-            .withNetwork(network)
-            .withNetworkAliases(MASTER_HOST)
-            .withEnv("ROLE", "master")
-            .withEnv("MASTER_HOST", MASTER_HOST)
-            .withCommand("-p " + numWorkers);
-
-    for (GenericContainer<?> dependent : dependents) {
-      master.dependsOn(dependent);
+    private static ImageFromDockerfile appImage(
+        String appName,
+        Configuration dynamicProperties,
+        List<ClasspathBuildContextFile> classpathBuildContextFiles) {
+      final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
+      LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
+
+      final ImageFromDockerfile appImage =
+          new ImageFromDockerfile(appName)
+              .withFileFromClasspath("Dockerfile", "Dockerfile")
+              .withFileFromPath(".", targetDirPath);
+
+      Configuration flinkConf = resolveFlinkConf(dynamicProperties);
+      String flinkConfString = flinkConfigAsString(flinkConf);
+      LOG.info(
+          "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
+          flinkConf);
+      appImage.withFileFromString("flink-conf.yaml", flinkConfString);
+
+      for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) {
+        appImage.withFileFromClasspath(
+            classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath);
+      }
+
+      return appImage;
     }
 
-    if (masterLogger != null) {
-      master.withLogConsumer(new Slf4jLogConsumer(masterLogger, true));
+    /**
+     * Merges set dynamic properties with configuration in the base flink-conf.yaml located in
+     * resources.
+     */
+    private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
+      final InputStream baseFlinkConfResourceInputStream =
+          StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
+      if (baseFlinkConfResourceInputStream == null) {
+        throw new RuntimeException("Base flink-conf.yaml cannot be found.");
+      }
+
+      final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
+      return GlobalConfiguration.loadConfiguration(
+          tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
     }
 
-    return master;
-  }
+    private static String flinkConfigAsString(Configuration configuration) {
+      StringBuilder yaml = new StringBuilder();
+      for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
+        yaml.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
+      }
+
+      return yaml.toString();
+    }
 
-  private static List<GenericContainer<?>> workerContainers(
-      ImageFromDockerfile appImage, int numWorkers, Network network) {
-    final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers);
+    private static File copyToTempFlinkConfFile(InputStream inputStream) {
+      try {
+        final File tempFile =
+            new File(
+                Files.createTempDirectory("statefun-app-containers").toString(), "flink-conf.yaml");
+        Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        return tempFile;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
 
-    for (int i = 0; i < numWorkers; i++) {
-      workers.add(
+    private static GenericContainer<?> masterContainer(
+        ImageFromDockerfile appImage,
+        Network network,
+        List<GenericContainer<?>> dependents,
+        int numWorkers,
+        @Nullable Logger masterLogger) {
+      final GenericContainer<?> master =
           new GenericContainer(appImage)
               .withNetwork(network)
-              .withNetworkAliases(workerHostOf(i))
-              .withEnv("ROLE", "worker")
-              .withEnv("MASTER_HOST", MASTER_HOST));
+              .withNetworkAliases(MASTER_HOST)
+              .withEnv("ROLE", "master")
+              .withEnv("MASTER_HOST", MASTER_HOST)
+              .withCommand("-p " + numWorkers);
+
+      for (GenericContainer<?> dependent : dependents) {
+        master.dependsOn(dependent);
+      }
+
+      if (masterLogger != null) {
+        master.withLogConsumer(new Slf4jLogConsumer(masterLogger, true));
+      }
+
+      return master;
     }
 
-    return workers;
-  }
+    private static List<GenericContainer<?>> workerContainers(
+        ImageFromDockerfile appImage, int numWorkers, Network network) {
+      final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers);
 
-  private static String workerHostOf(int workerIndex) {
-    return WORKER_HOST_PREFIX + "-" + workerIndex;
-  }
+      for (int i = 0; i < numWorkers; i++) {
+        workers.add(
+            new GenericContainer(appImage)
+                .withNetwork(network)
+                .withNetworkAliases(workerHostOf(i))
+                .withEnv("ROLE", "worker")
+                .withEnv("MASTER_HOST", MASTER_HOST));
+      }
+
+      return workers;
+    }
+
+    private static String workerHostOf(int workerIndex) {
+      return WORKER_HOST_PREFIX + "-" + workerIndex;
+    }
 
-  private static class ClasspathBuildContextFile {
-    private final String buildContextPath;
-    private final String fromResourcePath;
+    private static class ClasspathBuildContextFile {
+      private final String buildContextPath;
+      private final String fromResourcePath;
 
-    ClasspathBuildContextFile(String buildContextPath, String fromResourcePath) {
-      this.buildContextPath = Objects.requireNonNull(buildContextPath);
-      this.fromResourcePath = Objects.requireNonNull(fromResourcePath);
+      ClasspathBuildContextFile(String buildContextPath, String fromResourcePath) {
+        this.buildContextPath = Objects.requireNonNull(buildContextPath);
+        this.fromResourcePath = Objects.requireNonNull(fromResourcePath);
+      }
     }
   }
 }
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
index 9f4eda1..3869163 100644
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
+++ b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
@@ -64,13 +64,14 @@ public class RoutableKafkaE2E {
 
   @Rule
   public StatefulFunctionsAppContainers verificationApp =
-      new StatefulFunctionsAppContainers("routable-kafka-verification", 1)
+      StatefulFunctionsAppContainers.builder("routable-kafka-verification", 1)
           .dependsOn(kafka)
           .exposeMasterLogs(LOG)
           .withBuildContextFileFromClasspath(
               "routable-kafka-ingress-module", "/routable-kafka-ingress-module/")
           .withModuleGlobalConfiguration(
-              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092");
+              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
+          .build();
 
   @Test(timeout = 60_000L)
   public void run() {
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
index 1bd08c7..473dc9b 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
@@ -63,11 +63,12 @@ public class SanityVerificationE2E {
 
   @Rule
   public StatefulFunctionsAppContainers verificationApp =
-      new StatefulFunctionsAppContainers("sanity-verification", 2)
+      StatefulFunctionsAppContainers.builder("sanity-verification", 2)
           .dependsOn(kafka)
           .exposeMasterLogs(LOG)
           .withModuleGlobalConfiguration(
-              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092");
+              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
+          .build();
 
   @Test(timeout = 60_000L)
   public void run() throws Exception {


[flink-statefun] 04/17: [FLINK-17516] [e2e] Bind checkpoint dirs to temporary local directory

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 60f4bd74cc7c88422dc795fda630d818943949e7
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue May 12 23:24:55 2020 +0800

    [FLINK-17516] [e2e] Bind checkpoint dirs to temporary local directory
---
 .../e2e/common/StatefulFunctionsAppContainers.java  | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index 5880ae3..9894c2d 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -19,6 +19,7 @@
 package org.apache.flink.statefun.e2e.common;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -33,9 +34,11 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
+import org.apache.flink.util.FileUtils;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -118,6 +121,8 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
   private GenericContainer<?> master;
   private List<GenericContainer<?>> workers;
 
+  private File checkpointDir;
+
   private StatefulFunctionsAppContainers(
       GenericContainer<?> masterContainer, List<GenericContainer<?>> workerContainers) {
     this.master = Objects.requireNonNull(masterContainer);
@@ -137,6 +142,15 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
 
   @Override
   protected void before() throws Throwable {
+    checkpointDir = temporaryCheckpointDir();
+
+    master.withFileSystemBind(
+        checkpointDir.getAbsolutePath(), "/checkpoint-dir", BindMode.READ_WRITE);
+    workers.forEach(
+        worker ->
+            worker.withFileSystemBind(
+                checkpointDir.getAbsolutePath(), "/checkpoint-dir", BindMode.READ_WRITE));
+
     master.start();
     workers.forEach(GenericContainer::start);
   }
@@ -145,6 +159,8 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
   protected void after() {
     master.stop();
     workers.forEach(GenericContainer::stop);
+
+    FileUtils.deleteDirectoryQuietly(checkpointDir);
   }
 
   /** @return the exposed port on master for calling REST APIs. */
@@ -152,6 +168,11 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     return master.getMappedPort(8081);
   }
 
+  private static File temporaryCheckpointDir() throws IOException {
+    final Path currentWorkingDir = Paths.get(System.getProperty("user.dir"));
+    return Files.createTempDirectory(currentWorkingDir, "statefun-app-checkpoints-").toFile();
+  }
+
   public static final class Builder {
     private static final String MASTER_HOST = "statefun-app-master";
     private static final String WORKER_HOST_PREFIX = "statefun-app-worker";


[flink-statefun] 08/17: [FLINK-17712] [build] Upgrade Flink version to 1.10.1

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit ea14e2cc76dd777b460795357f16d431c53827b8
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri May 15 13:16:47 2020 +0800

    [FLINK-17712] [build] Upgrade Flink version to 1.10.1
    
    This closes #111.
---
 pom.xml                 | 2 +-
 tools/docker/Dockerfile | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index 2783fed..93483b4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@ under the License.
         <auto-service.version>1.0-rc6</auto-service.version>
         <protobuf.version>3.7.1</protobuf.version>
         <protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version>
-        <flink.version>1.10.0</flink.version>
+        <flink.version>1.10.1</flink.version>
     </properties>
 
     <dependencies>
diff --git a/tools/docker/Dockerfile b/tools/docker/Dockerfile
index 7c41d38..e697e33 100644
--- a/tools/docker/Dockerfile
+++ b/tools/docker/Dockerfile
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM flink:1.10.0
+FROM flink:1.10.1
 
 ENV ROLE worker
 ENV MASTER_HOST localhost


[flink-statefun] 12/17: [FLINK-17533] Expose the UnboundedFeedbackLoggerFactory

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit b3f6b844e9f9b568b7c94f68b7cfdf758e8e66dc
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 21:32:07 2020 +0200

    [FLINK-17533] Expose the UnboundedFeedbackLoggerFactory
---
 .../flink/statefun/flink/core/logger/Loggers.java     | 19 +++++++++++++++++--
 .../flink/core/logger/UnboundedFeedbackLogger.java    | 11 ++++-------
 .../core/logger/UnboundedFeedbackLoggerTest.java      |  2 +-
 3 files changed, 22 insertions(+), 10 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
index 62720bf..948a808 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
@@ -43,10 +43,25 @@ public final class Loggers {
       TypeSerializer<?> serializer,
       Function<?, ?> keySelector) {
 
+    UnboundedFeedbackLoggerFactory<?> factory =
+        unboundedSpillableLoggerFactory(
+            ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector);
+
+    return factory.create();
+  }
+
+  public static UnboundedFeedbackLoggerFactory<?> unboundedSpillableLoggerFactory(
+      IOManager ioManager,
+      int maxParallelism,
+      long inMemoryMaxBufferSize,
+      TypeSerializer<?> serializer,
+      Function<?, ?> keySelector) {
+
     ObjectContainer container =
         unboundedSpillableLoggerContainer(
             ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector);
-    return container.get(UnboundedFeedbackLogger.class);
+
+    return container.get(UnboundedFeedbackLoggerFactory.class);
   }
 
   /** Wires the required dependencies to construct an {@link UnboundedFeedbackLogger}. */
@@ -70,7 +85,7 @@ public final class Loggers {
         "checkpoint-stream-ops",
         CheckpointedStreamOperations.class,
         KeyedStateCheckpointOutputStreamOps.INSTANCE);
-    container.add(UnboundedFeedbackLogger.class);
+    container.add(UnboundedFeedbackLoggerFactory.class);
     return container;
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
index 8ddc22e..ef0360a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
@@ -35,8 +35,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.statefun.flink.core.di.Inject;
-import org.apache.flink.statefun.flink.core.di.Label;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
 import org.apache.flink.util.IOUtils;
 
@@ -50,12 +48,11 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
   private TypeSerializer<T> serializer;
   private Closeable snapshotLease;
 
-  @Inject
   public UnboundedFeedbackLogger(
-      @Label("key-group-supplier") Supplier<KeyGroupStream<T>> supplier,
-      @Label("key-group-assigner") ToIntFunction<T> keyGroupAssigner,
-      @Label("checkpoint-stream-ops") CheckpointedStreamOperations ops,
-      @Label("envelope-serializer") TypeSerializer<T> serializer) {
+      Supplier<KeyGroupStream<T>> supplier,
+      ToIntFunction<T> keyGroupAssigner,
+      CheckpointedStreamOperations ops,
+      TypeSerializer<T> serializer) {
     this.supplier = Objects.requireNonNull(supplier);
     this.keyGroupAssigner = Objects.requireNonNull(keyGroupAssigner);
     this.serializer = Objects.requireNonNull(serializer);
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
index 8b66b21..75c4c55 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
@@ -115,7 +115,7 @@ public class UnboundedFeedbackLoggerTest {
             IO_MANAGER, maxParallelism, totalMemory, IntSerializer.INSTANCE, Function.identity());
 
     container.add("checkpoint-stream-ops", CheckpointedStreamOperations.class, NOOP.INSTANCE);
-    return container.get(UnboundedFeedbackLogger.class);
+    return container.get(UnboundedFeedbackLoggerFactory.class).create();
   }
 
   enum NOOP implements CheckpointedStreamOperations {


[flink-statefun] 14/17: [FLINK-17533] Add support for multiple concurrent checkpoints

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 12626a5b381dfebb40515d03992bdf243f7e603d
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 22:32:10 2020 +0200

    [FLINK-17533] Add support for multiple concurrent checkpoints
---
 .../statefun/flink/core/feedback/Checkpoints.java  |  61 +++++++++
 .../flink/core/feedback/FeedbackUnionOperator.java |  34 ++---
 .../statefun/flink/core/logger/FeedbackLogger.java |  33 +++++
 .../flink/core/logger/UnboundedFeedbackLogger.java |   5 +-
 .../flink/core/feedback/CheckpointsTest.java       | 143 +++++++++++++++++++++
 5 files changed, 260 insertions(+), 16 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java
new file mode 100644
index 0000000..8fd0322
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.feedback;
+
+import java.io.OutputStream;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.flink.core.logger.FeedbackLogger;
+import org.apache.flink.util.IOUtils;
+
+final class Checkpoints<T> implements AutoCloseable {
+  private final Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory;
+  private final TreeMap<Long, FeedbackLogger<T>> uncompletedCheckpoints = new TreeMap<>();
+
+  Checkpoints(Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory) {
+    this.feedbackLoggerFactory = Objects.requireNonNull(feedbackLoggerFactory);
+  }
+
+  public void startLogging(long checkpointId, OutputStream outputStream) {
+    FeedbackLogger<T> logger = feedbackLoggerFactory.get();
+    logger.startLogging(outputStream);
+    uncompletedCheckpoints.put(checkpointId, logger);
+  }
+
+  public void append(T element) {
+    for (FeedbackLogger<T> logger : uncompletedCheckpoints.values()) {
+      logger.append(element);
+    }
+  }
+
+  public void commitCheckpointsUntil(long checkpointId) {
+    SortedMap<Long, FeedbackLogger<T>> completedCheckpoints =
+        uncompletedCheckpoints.headMap(checkpointId, true);
+    completedCheckpoints.values().forEach(FeedbackLogger::commit);
+    completedCheckpoints.clear();
+  }
+
+  @Override
+  public void close() {
+    IOUtils.closeAllQuietly(uncompletedCheckpoints.values());
+    uncompletedCheckpoints.clear();
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
index 402ded8..78fe10d 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.statefun.flink.core.feedback;
 
 import java.util.Objects;
+import java.util.OptionalLong;
 import java.util.concurrent.Executor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -26,9 +27,9 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
 import org.apache.flink.statefun.flink.core.common.SerializableFunction;
-import org.apache.flink.statefun.flink.core.common.SerializablePredicate;
 import org.apache.flink.statefun.flink.core.logger.Loggers;
 import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLogger;
+import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLoggerFactory;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.MailboxExecutor;
@@ -43,20 +44,20 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
 
   // -- configuration
   private final FeedbackKey<T> feedbackKey;
-  private final SerializablePredicate<T> isBarrierMessage;
+  private final SerializableFunction<T, OptionalLong> isBarrierMessage;
   private final SerializableFunction<T, ?> keySelector;
   private final long totalMemoryUsedForFeedbackCheckpointing;
   private final TypeSerializer<T> elementSerializer;
 
   // -- runtime
-  private transient UnboundedFeedbackLogger<T> feedbackLogger;
+  private transient Checkpoints<T> checkpoints;
   private transient boolean closedOrDisposed;
   private transient MailboxExecutor mailboxExecutor;
   private transient StreamRecord<T> reusable;
 
   FeedbackUnionOperator(
       FeedbackKey<T> feedbackKey,
-      SerializablePredicate<T> isBarrierMessage,
+      SerializableFunction<T, OptionalLong> isBarrierMessage,
       SerializableFunction<T, ?> keySelector,
       long totalMemoryUsedForFeedbackCheckpointing,
       TypeSerializer<T> elementSerializer,
@@ -84,11 +85,12 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
     if (closedOrDisposed) {
       return;
     }
-    if (isBarrierMessage.test(element)) {
-      feedbackLogger.commit();
+    OptionalLong maybeCheckpoint = isBarrierMessage.apply(element);
+    if (maybeCheckpoint.isPresent()) {
+      checkpoints.commitCheckpointsUntil(maybeCheckpoint.getAsLong());
     } else {
       sendDownstream(element);
-      feedbackLogger.append(element);
+      checkpoints.append(element);
     }
   }
 
@@ -105,22 +107,24 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
     // Initialize the unbounded feedback logger
     //
     @SuppressWarnings("unchecked")
-    UnboundedFeedbackLogger<T> feedbackLogger =
-        (UnboundedFeedbackLogger<T>)
-            Loggers.unboundedSpillableLogger(
+    UnboundedFeedbackLoggerFactory<T> feedbackLoggerFactory =
+        (UnboundedFeedbackLoggerFactory<T>)
+            Loggers.unboundedSpillableLoggerFactory(
                 ioManager,
                 maxParallelism,
                 totalMemoryUsedForFeedbackCheckpointing,
                 elementSerializer,
                 keySelector);
 
-    this.feedbackLogger = feedbackLogger;
+    this.checkpoints = new Checkpoints<>(feedbackLoggerFactory::create);
+
     //
     // we first must reply previously check-pointed envelopes before we start
     // processing any new envelopes.
     //
+    UnboundedFeedbackLogger<T> logger = feedbackLoggerFactory.create();
     for (KeyGroupStatePartitionStreamProvider keyedStateInput : context.getRawKeyedStateInputs()) {
-      this.feedbackLogger.replyLoggedEnvelops(keyedStateInput.getStream(), this);
+      logger.replyLoggedEnvelops(keyedStateInput.getStream(), this);
     }
     //
     // now we can start processing new messages. We do so by registering ourselves as a
@@ -132,7 +136,7 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
   @Override
   public void snapshotState(StateSnapshotContext context) throws Exception {
     super.snapshotState(context);
-    this.feedbackLogger.startLogging(context.getRawKeyedOperatorStateOutput());
+    checkpoints.startLogging(context.getCheckpointId(), context.getRawKeyedOperatorStateOutput());
   }
 
   @Override
@@ -152,8 +156,8 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
   // ------------------------------------------------------------------------------------------------------------------
 
   private void closeInternally() {
-    IOUtils.closeQuietly(feedbackLogger);
-    feedbackLogger = null;
+    IOUtils.closeQuietly(checkpoints);
+    checkpoints = null;
     closedOrDisposed = true;
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java
new file mode 100644
index 0000000..465a717
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.logger;
+
+import java.io.OutputStream;
+
+public interface FeedbackLogger<T> extends AutoCloseable {
+
+  /** Start logging messages into the supplied output stream. */
+  void startLogging(OutputStream keyedStateCheckpointOutputStream);
+
+  /** Append a message to the currently logging logger. */
+  void append(T message);
+
+  /** Commit the currently logging logger. */
+  void commit();
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
index ef0360a..409f714 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
@@ -38,7 +38,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
 import org.apache.flink.util.IOUtils;
 
-public final class UnboundedFeedbackLogger<T> implements Closeable {
+public final class UnboundedFeedbackLogger<T> implements FeedbackLogger<T> {
   private final Supplier<KeyGroupStream<T>> supplier;
   private final ToIntFunction<T> keyGroupAssigner;
   private final Map<Integer, KeyGroupStream<T>> keyGroupStreams;
@@ -60,6 +60,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
     this.checkpointedStreamOperations = Objects.requireNonNull(ops);
   }
 
+  @Override
   public void startLogging(OutputStream keyedStateCheckpointOutputStream) {
     this.checkpointedStreamOperations.requireKeyedStateCheckpointed(
         keyedStateCheckpointOutputStream);
@@ -68,6 +69,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
         checkpointedStreamOperations.acquireLease(keyedStateCheckpointOutputStream);
   }
 
+  @Override
   public void append(T message) {
     if (keyedStateOutputStream == null) {
       //
@@ -79,6 +81,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
     keyGroup.append(message);
   }
 
+  @Override
   public void commit() {
     try {
       flushToKeyedStateOutputStream();
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java
new file mode 100644
index 0000000..7b8dbee
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.feedback;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.flink.core.logger.FeedbackLogger;
+import org.apache.flink.util.Preconditions;
+import org.junit.Test;
+
+public class CheckpointsTest {
+
+  @Test
+  public void usageExample() {
+    Loggers loggers = new Loggers();
+
+    Checkpoints<String> checkpoints = new Checkpoints<>(loggers);
+    checkpoints.startLogging(1, new ByteArrayOutputStream());
+    checkpoints.append("hello");
+    checkpoints.append("world");
+    checkpoints.commitCheckpointsUntil(1);
+
+    assertThat(loggers.items(0), contains("hello", "world"));
+    assertThat(loggers.state(0), is(LoggerState.COMMITTED));
+  }
+
+  @Test
+  public void dataIsAppendedToMultipleLoggers() {
+    Loggers loggers = new Loggers();
+
+    Checkpoints<String> checkpoints = new Checkpoints<>(loggers);
+
+    checkpoints.startLogging(1, new ByteArrayOutputStream());
+    checkpoints.append("a");
+
+    checkpoints.startLogging(2, new ByteArrayOutputStream());
+    checkpoints.append("b");
+
+    checkpoints.commitCheckpointsUntil(1);
+    checkpoints.append("c");
+
+    checkpoints.commitCheckpointsUntil(2);
+
+    assertThat(loggers.items(0), contains("a", "b"));
+    assertThat(loggers.items(1), contains("b", "c"));
+  }
+
+  @Test
+  public void committingALaterCheckpointCommitsPreviousCheckpoints() {
+    Loggers loggers = new Loggers();
+
+    Checkpoints<String> checkpoints = new Checkpoints<>(loggers);
+
+    checkpoints.startLogging(1, new ByteArrayOutputStream());
+    checkpoints.startLogging(2, new ByteArrayOutputStream());
+    checkpoints.commitCheckpointsUntil(2);
+
+    assertThat(loggers.state(0), is(LoggerState.COMMITTED));
+    assertThat(loggers.state(1), is(LoggerState.COMMITTED));
+  }
+
+  private enum LoggerState {
+    IDLE,
+    LOGGING,
+    COMMITTED,
+    CLOSED
+  }
+
+  private static final class Loggers implements Supplier<FeedbackLogger<String>> {
+    private final List<FakeLogger> loggers = new ArrayList<>();
+
+    @Override
+    public FeedbackLogger<String> get() {
+      FakeLogger logger = new FakeLogger();
+      loggers.add(logger);
+      return logger;
+    }
+
+    List<String> items(int loggerIndex) {
+      Preconditions.checkElementIndex(loggerIndex, loggers.size());
+      FakeLogger logger = loggers.get(loggerIndex);
+      return logger.items;
+    }
+
+    LoggerState state(int loggerIndex) {
+      Preconditions.checkElementIndex(loggerIndex, loggers.size());
+      FakeLogger logger = loggers.get(loggerIndex);
+      return logger.state;
+    }
+  }
+
+  private static final class FakeLogger implements FeedbackLogger<String> {
+
+    List<String> items = new ArrayList<>();
+    LoggerState state = LoggerState.IDLE;
+
+    @Override
+    public void startLogging(OutputStream keyedStateCheckpointOutputStream) {
+      Preconditions.checkState(state == LoggerState.IDLE);
+      state = LoggerState.LOGGING;
+    }
+
+    @Override
+    public void append(String message) {
+      Preconditions.checkState(state != LoggerState.COMMITTED);
+      Preconditions.checkState(state != LoggerState.CLOSED);
+      items.add(message);
+    }
+
+    @Override
+    public void commit() {
+      Preconditions.checkState(state == LoggerState.LOGGING);
+      state = LoggerState.COMMITTED;
+    }
+
+    @Override
+    public void close() {
+      state = LoggerState.CLOSED;
+    }
+  }
+}


[flink-statefun] 02/17: [FLINK-17516] [e2e] Expose master REST port in StatefulFunctionsAppContainers

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 5244bb6b8a738aeb630255024e452061417f9440
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue May 12 23:08:25 2020 +0800

    [FLINK-17516] [e2e] Expose master REST port in StatefulFunctionsAppContainers
---
 .../flink/statefun/e2e/common/StatefulFunctionsAppContainers.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index 9a3f2a3..5880ae3 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -147,6 +147,11 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     workers.forEach(GenericContainer::stop);
   }
 
+  /** @return the exposed port on master for calling REST APIs. */
+  public int getMasterRestPort() {
+    return master.getMappedPort(8081);
+  }
+
   public static final class Builder {
     private static final String MASTER_HOST = "statefun-app-master";
     private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
@@ -289,7 +294,8 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
               .withNetworkAliases(MASTER_HOST)
               .withEnv("ROLE", "master")
               .withEnv("MASTER_HOST", MASTER_HOST)
-              .withCommand("-p " + numWorkers);
+              .withCommand("-p " + numWorkers)
+              .withExposedPorts(8081);
 
       for (GenericContainer<?> dependent : dependents) {
         master.dependsOn(dependent);


[flink-statefun] 10/17: [FLINK-16928] Remove jobmanager.scheduler from all config files and docs

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit cabe2eaca3194a472878c7c9f771ae302ee0f237
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri May 15 14:25:46 2020 +0800

    [FLINK-16928] Remove jobmanager.scheduler from all config files and docs
    
    This closes #112.
---
 docs/deployment-and-operations/packaging.md                              | 1 -
 .../statefun-e2e-tests-common/src/main/resources/flink-conf.yaml         | 1 -
 tools/docker/flink-distribution-template/conf/flink-conf.yaml            | 1 -
 tools/k8s/templates/config-map.yaml                                      | 1 -
 4 files changed, 4 deletions(-)

diff --git a/docs/deployment-and-operations/packaging.md b/docs/deployment-and-operations/packaging.md
index c90944b..d603920 100644
--- a/docs/deployment-and-operations/packaging.md
+++ b/docs/deployment-and-operations/packaging.md
@@ -73,7 +73,6 @@ The following configurations are strictly required for running StateFun applicat
 
 {% highlight yaml %}
 classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
-jobmanager.scheduler: legacy
 execution.checkpointing.max-concurrent-checkpoints: 1
 {% endhighlight %}
 
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
index d081c43..b022b0a 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
@@ -20,6 +20,5 @@ state.backend.rocksdb.timer-service.factory: ROCKSDB
 state.checkpoints.dir: file:///checkpoint-dir
 state.backend.incremental: true
 taskmanager.memory.process.size: 4g
-jobmanager.scheduler: legacy
 execution.checkpointing.interval: 5sec
 execution.checkpointing.mode: EXACTLY_ONCE
diff --git a/tools/docker/flink-distribution-template/conf/flink-conf.yaml b/tools/docker/flink-distribution-template/conf/flink-conf.yaml
index 574ccb0..d0d4522 100644
--- a/tools/docker/flink-distribution-template/conf/flink-conf.yaml
+++ b/tools/docker/flink-distribution-template/conf/flink-conf.yaml
@@ -20,7 +20,6 @@
 
 classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
 execution.checkpointing.max-concurrent-checkpoints: 1
-jobmanager.scheduler: legacy
 
 #==============================================================================
 # Recommended configurations. Users may change according to their needs.
diff --git a/tools/k8s/templates/config-map.yaml b/tools/k8s/templates/config-map.yaml
index d6bda3e..a57e7ff 100644
--- a/tools/k8s/templates/config-map.yaml
+++ b/tools/k8s/templates/config-map.yaml
@@ -34,7 +34,6 @@ data:
     execution.checkpointing.interval: {{ .Values.checkpoint.interval }}
     taskmanager.memory.process.size: {{ .Values.worker.jvm_mem }}
     parallelism.default: {{ .Values.worker.replicas }}
-    jobmanager.scheduler: legacy
 
   log4j-console.properties: |+
     log4j.rootLogger=INFO, console


[flink-statefun] 09/17: [FLINK-16928] [core] Remove legacy scheduler config validation

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit d146a90499f47c8410e4bac1e81a4f63644c7e57
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri May 15 14:24:23 2020 +0800

    [FLINK-16928] [core] Remove legacy scheduler config validation
---
 .../flink/core/StatefulFunctionsConfigValidator.java         | 10 ----------
 .../statefun/flink/core/StatefulFunctionsConfigTest.java     | 12 ------------
 2 files changed, 22 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
index 88cc943..9a7b0d1 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
@@ -26,7 +26,6 @@ import java.util.Locale;
 import java.util.Set;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
 import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 
@@ -43,7 +42,6 @@ public final class StatefulFunctionsConfigValidator {
   static void validate(Configuration configuration) {
     validateParentFirstClassloaderPatterns(configuration);
     validateMaxConcurrentCheckpoints(configuration);
-    validateLegacyScheduler(configuration);
   }
 
   private static void validateParentFirstClassloaderPatterns(Configuration configuration) {
@@ -66,14 +64,6 @@ public final class StatefulFunctionsConfigValidator {
     }
   }
 
-  private static void validateLegacyScheduler(Configuration configuration) {
-    String configuredScheduler = configuration.get(JobManagerOptions.SCHEDULER);
-    if (!"legacy".equalsIgnoreCase(configuredScheduler)) {
-      throw new StatefulFunctionsInvalidConfigException(
-          JobManagerOptions.SCHEDULER, "Currently the only supported scheduler is 'legacy'");
-    }
-  }
-
   private static Set<String> parentFirstClassloaderPatterns(Configuration configuration) {
     final String[] split =
         configuration.get(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL).split(";");
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
index cc7f8f3..5a93dd7 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.statefun.flink.core;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
@@ -45,7 +44,6 @@ public class StatefulFunctionsConfigTest {
     configuration.set(
         CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
         "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
-    configuration.set(JobManagerOptions.SCHEDULER, "legacy");
     configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);
     configuration.setString("statefun.module.global-config.key1", "value1");
     configuration.setString("statefun.module.global-config.key2", "value2");
@@ -63,15 +61,6 @@ public class StatefulFunctionsConfigTest {
   }
 
   @Test(expected = StatefulFunctionsInvalidConfigException.class)
-  public void testMissingScheduler() {
-    Configuration configuration = validConfiguration();
-
-    configuration.removeConfig(JobManagerOptions.SCHEDULER);
-
-    new StatefulFunctionsConfig(configuration);
-  }
-
-  @Test(expected = StatefulFunctionsInvalidConfigException.class)
   public void invalidStrictFlinkConfigsThrows() {
     Configuration configuration = new Configuration();
     new StatefulFunctionsConfig(configuration);
@@ -90,7 +79,6 @@ public class StatefulFunctionsConfigTest {
         CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
         "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
     configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);
-    configuration.set(JobManagerOptions.SCHEDULER, "legacy");
     return configuration;
   }
 }


[flink-statefun] 11/17: [FLINK-17533] Add UnboundedFeedbackLoggerFactory

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 3927424636ceecba12dca915ccf2a5130848db0b
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 21:19:22 2020 +0200

    [FLINK-17533] Add UnboundedFeedbackLoggerFactory
---
 .../logger/UnboundedFeedbackLoggerFactory.java     | 49 ++++++++++++++++++++++
 1 file changed, 49 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerFactory.java
new file mode 100644
index 0000000..dd0052f
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.logger;
+
+import java.util.Objects;
+import java.util.function.Supplier;
+import java.util.function.ToIntFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.statefun.flink.core.di.Inject;
+import org.apache.flink.statefun.flink.core.di.Label;
+
+public final class UnboundedFeedbackLoggerFactory<T> {
+  private final Supplier<KeyGroupStream<T>> supplier;
+  private final ToIntFunction<T> keyGroupAssigner;
+  private final CheckpointedStreamOperations checkpointedStreamOperations;
+  private final TypeSerializer<T> serializer;
+
+  @Inject
+  public UnboundedFeedbackLoggerFactory(
+      @Label("key-group-supplier") Supplier<KeyGroupStream<T>> supplier,
+      @Label("key-group-assigner") ToIntFunction<T> keyGroupAssigner,
+      @Label("checkpoint-stream-ops") CheckpointedStreamOperations ops,
+      @Label("envelope-serializer") TypeSerializer<T> serializer) {
+    this.supplier = Objects.requireNonNull(supplier);
+    this.keyGroupAssigner = Objects.requireNonNull(keyGroupAssigner);
+    this.serializer = Objects.requireNonNull(serializer);
+    this.checkpointedStreamOperations = Objects.requireNonNull(ops);
+  }
+
+  public UnboundedFeedbackLogger<T> create() {
+    return new UnboundedFeedbackLogger<>(
+        supplier, keyGroupAssigner, checkpointedStreamOperations, serializer);
+  }
+}


[flink-statefun] 07/17: [FLINK-17516] [e2e] Implement exactly-once E2E against ExactlyOnceVerificationModule

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit bd30ec034ca0b09c5e888d0eb0c119c5e8799d32
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri May 15 18:33:29 2020 +0800

    [FLINK-17516] [e2e] Implement exactly-once E2E against ExactlyOnceVerificationModule
    
    This closes #108.
---
 .../statefun/e2e/exactlyonce/ExactlyOnceE2E.java   | 155 +++++++++++++++++++++
 .../src/test/resources/Dockerfile                  |  20 +++
 .../src/test/resources/log4j.properties            |  24 ++++
 3 files changed, 199 insertions(+)

diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java
new file mode 100644
index 0000000..b0c61d0
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
+import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
+import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+
+/**
+ * End-to-end test based on the {@link ExactlyOnceVerificationModule} application.
+ *
+ * <p>This test writes some {@link WrappedMessage} records to Kafka, which eventually gets routed to
+ * the counter function in the application. Then, after the corresponding {@link InvokeCount} are
+ * seen in the Kafka egress (which implies some checkpoints have been completed since the
+ * verification application is using exactly-once delivery), we restart a worker to simulate
+ * failure. The application should automatically attempt to recover and eventually restart.
+ * Meanwhile, more records are written to Kafka again. We verify that on the consumer side, the
+ * invocation counts increase sequentially for each key as if the failure did not occur.
+ */
+public class ExactlyOnceE2E {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceE2E.class);
+
+  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+  private static final String KAFKA_HOST = "kafka-broker";
+
+  private static final int NUM_WORKERS = 2;
+
+  /**
+   * Kafka broker. We need to explicitly set the transaction state log replication factor and min
+   * ISR since by default, those values are larger than 1 while we are only using 1 Kafka broker.
+   */
+  @Rule
+  public KafkaContainer kafka =
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
+          .withNetworkAliases(KAFKA_HOST)
+          .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
+          .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
+
+  @Rule
+  public StatefulFunctionsAppContainers verificationApp =
+      StatefulFunctionsAppContainers.builder("exactly-once-verification", NUM_WORKERS)
+          .dependsOn(kafka)
+          .exposeMasterLogs(LOG)
+          .withModuleGlobalConfiguration(
+              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
+          .build();
+
+  @Test(timeout = 300_000L)
+  public void run() throws Exception {
+    final String kafkaAddress = kafka.getBootstrapServers();
+
+    final Producer<String, WrappedMessage> messageProducer =
+        kafkaWrappedMessagesProducer(kafkaAddress);
+    final Consumer<String, InvokeCount> invokeCountsConsumer =
+        kafkaInvokeCountsConsumer(kafkaAddress);
+
+    final KafkaIOVerifier<String, WrappedMessage, String, InvokeCount> verifier =
+        new KafkaIOVerifier<>(messageProducer, invokeCountsConsumer);
+
+    assertThat(
+        verifier.sending(wrappedMessage("foo"), wrappedMessage("foo"), wrappedMessage("foo")),
+        verifier.resultsInOrder(
+            is(invokeCount("foo", 1)), is(invokeCount("foo", 2)), is(invokeCount("foo", 3))));
+
+    LOG.info(
+        "Restarting random worker to simulate failure. The application should automatically recover.");
+    verificationApp.restartWorker(randomWorkerIndex());
+
+    assertThat(
+        verifier.sending(wrappedMessage("foo"), wrappedMessage("foo"), wrappedMessage("foo")),
+        verifier.resultsInOrder(
+            is(invokeCount("foo", 4)), is(invokeCount("foo", 5)), is(invokeCount("foo", 6))));
+  }
+
+  private static Producer<String, WrappedMessage> kafkaWrappedMessagesProducer(
+      String bootstrapServers) {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", bootstrapServers);
+
+    return new KafkaProducer<>(
+        props, new StringSerializer(), new KafkaProtobufSerializer<>(WrappedMessage.parser()));
+  }
+
+  private Consumer<String, InvokeCount> kafkaInvokeCountsConsumer(String bootstrapServers) {
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
+    consumerProps.setProperty("group.id", "exactly-once-e2e");
+    consumerProps.setProperty("auto.offset.reset", "earliest");
+    consumerProps.setProperty("isolation.level", "read_committed");
+
+    KafkaConsumer<String, InvokeCount> consumer =
+        new KafkaConsumer<>(
+            consumerProps,
+            new StringDeserializer(),
+            new KafkaProtobufSerializer<>(InvokeCount.parser()));
+    consumer.subscribe(Collections.singletonList(KafkaIO.INVOKE_COUNTS_TOPIC_NAME));
+
+    return consumer;
+  }
+
+  private static ProducerRecord<String, WrappedMessage> wrappedMessage(String targetInvokeId) {
+    final String key = UUID.randomUUID().toString();
+
+    return new ProducerRecord<>(
+        KafkaIO.WRAPPED_MESSAGES_TOPIC_NAME,
+        key,
+        WrappedMessage.newBuilder().setInvokeTargetId(targetInvokeId).setKey(key).build());
+  }
+
+  private static InvokeCount invokeCount(String id, int count) {
+    return InvokeCount.newBuilder().setId(id).setInvokeCount(count).build();
+  }
+
+  private static int randomWorkerIndex() {
+    return new Random().nextInt(NUM_WORKERS);
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/Dockerfile
new file mode 100644
index 0000000..7b2cca6
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/Dockerfile
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM flink-statefun:2.1-SNAPSHOT
+
+RUN mkdir -p /opt/statefun/modules/statefun-exactly-once-e2e
+COPY statefun-exactly-once-e2e*.jar /opt/statefun/modules/statefun-exactly-once-e2e/
+COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fb965d3
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n


[flink-statefun] 15/17: [FLINK-17533] Remove concurrent checkpoints limitation

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 6425f42d9a62ce3185f0fd3eb284a5756f992445
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 23:03:36 2020 +0200

    [FLINK-17533] Remove concurrent checkpoints limitation
    
    This closes #105.
---
 docs/deployment-and-operations/packaging.md                  |  1 -
 .../flink/core/StatefulFunctionsConfigValidator.java         | 12 ------------
 .../docker/flink-distribution-template/conf/flink-conf.yaml  |  1 -
 3 files changed, 14 deletions(-)

diff --git a/docs/deployment-and-operations/packaging.md b/docs/deployment-and-operations/packaging.md
index d603920..7af18f1 100644
--- a/docs/deployment-and-operations/packaging.md
+++ b/docs/deployment-and-operations/packaging.md
@@ -73,6 +73,5 @@ The following configurations are strictly required for running StateFun applicat
 
 {% highlight yaml %}
 classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
-execution.checkpointing.max-concurrent-checkpoints: 1
 {% endhighlight %}
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
index 9a7b0d1..c4f658c 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
@@ -27,7 +27,6 @@ import java.util.Set;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
-import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 
 public final class StatefulFunctionsConfigValidator {
 
@@ -41,7 +40,6 @@ public final class StatefulFunctionsConfigValidator {
 
   static void validate(Configuration configuration) {
     validateParentFirstClassloaderPatterns(configuration);
-    validateMaxConcurrentCheckpoints(configuration);
   }
 
   private static void validateParentFirstClassloaderPatterns(Configuration configuration) {
@@ -54,16 +52,6 @@ public final class StatefulFunctionsConfigValidator {
     }
   }
 
-  private static void validateMaxConcurrentCheckpoints(Configuration configuration) {
-    final int maxConcurrentCheckpoints =
-        configuration.get(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS);
-    if (maxConcurrentCheckpoints != 1) {
-      throw new StatefulFunctionsInvalidConfigException(
-          ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS,
-          "Value must be 1, Stateful Functions does not support concurrent checkpoints.");
-    }
-  }
-
   private static Set<String> parentFirstClassloaderPatterns(Configuration configuration) {
     final String[] split =
         configuration.get(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL).split(";");
diff --git a/tools/docker/flink-distribution-template/conf/flink-conf.yaml b/tools/docker/flink-distribution-template/conf/flink-conf.yaml
index d0d4522..430c4cb 100644
--- a/tools/docker/flink-distribution-template/conf/flink-conf.yaml
+++ b/tools/docker/flink-distribution-template/conf/flink-conf.yaml
@@ -19,7 +19,6 @@
 #==============================================================================
 
 classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
-execution.checkpointing.max-concurrent-checkpoints: 1
 
 #==============================================================================
 # Recommended configurations. Users may change according to their needs.


[flink-statefun] 03/17: [FLINK-17516] [e2e] Enable exactly-once checkpointing in all e2e tests

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 84afed935260e3c538fa758f361b39c03f6ac33b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue May 12 23:24:06 2020 +0800

    [FLINK-17516] [e2e] Enable exactly-once checkpointing in all e2e tests
---
 .../statefun-e2e-tests-common/src/main/resources/flink-conf.yaml        | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
index 7d7c307..d081c43 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
@@ -21,3 +21,5 @@ state.checkpoints.dir: file:///checkpoint-dir
 state.backend.incremental: true
 taskmanager.memory.process.size: 4g
 jobmanager.scheduler: legacy
+execution.checkpointing.interval: 5sec
+execution.checkpointing.mode: EXACTLY_ONCE


[flink-statefun] 13/17: [FLINK-17533] Expose checkpointId from barrier messages

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 4aac946624eed8a0b9f334acad550de7e2e3ad7e
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 22:27:02 2020 +0200

    [FLINK-17533] Expose checkpointId from barrier messages
    
    This commit changes the boolean method isBarrierMessage() to
    return an OptionalLong that would carry the checkpointId when the message
    is a checkpoint barrier.
---
 .../flink/core/feedback/FeedbackUnionOperatorFactory.java     |  6 +++---
 .../statefun/flink/core/functions/AsyncMessageDecorator.java  |  5 +++--
 .../org/apache/flink/statefun/flink/core/message/Message.java | 11 ++++++++++-
 .../flink/statefun/flink/core/message/ProtobufMessage.java    |  9 +++++++--
 .../apache/flink/statefun/flink/core/message/SdkMessage.java  |  5 +++--
 .../flink/statefun/flink/core/translation/FlinkUniverse.java  |  7 ++++---
 6 files changed, 30 insertions(+), 13 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
index 580ad2d..f87128d 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
@@ -18,10 +18,10 @@
 package org.apache.flink.statefun.flink.core.feedback;
 
 import java.util.Objects;
+import java.util.OptionalLong;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.common.SerializableFunction;
-import org.apache.flink.statefun.flink.core.common.SerializablePredicate;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.*;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -35,7 +35,7 @@ public final class FeedbackUnionOperatorFactory<E>
   private final StatefulFunctionsConfig configuration;
 
   private final FeedbackKey<E> feedbackKey;
-  private final SerializablePredicate<E> isBarrierMessage;
+  private final SerializableFunction<E, OptionalLong> isBarrierMessage;
   private final SerializableFunction<E, ?> keySelector;
 
   private transient MailboxExecutor mailboxExecutor;
@@ -43,7 +43,7 @@ public final class FeedbackUnionOperatorFactory<E>
   public FeedbackUnionOperatorFactory(
       StatefulFunctionsConfig configuration,
       FeedbackKey<E> feedbackKey,
-      SerializablePredicate<E> isBarrierMessage,
+      SerializableFunction<E, OptionalLong> isBarrierMessage,
       SerializableFunction<E, ?> keySelector) {
     this.feedbackKey = Objects.requireNonNull(feedbackKey);
     this.isBarrierMessage = Objects.requireNonNull(isBarrierMessage);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
index ee6e869..c77adb7 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.statefun.flink.core.functions;
 
+import java.util.OptionalLong;
 import javax.annotation.Nullable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.flink.core.message.Message;
@@ -87,8 +88,8 @@ final class AsyncMessageDecorator<T> implements Message {
   }
 
   @Override
-  public boolean isBarrierMessage() {
-    return false;
+  public OptionalLong isBarrierMessage() {
+    return OptionalLong.empty();
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java
index 2e4cf99..9b0cbee 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java
@@ -18,6 +18,7 @@
 package org.apache.flink.statefun.flink.core.message;
 
 import java.io.IOException;
+import java.util.OptionalLong;
 import javax.annotation.Nullable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.sdk.Address;
@@ -31,7 +32,15 @@ public interface Message {
 
   Object payload(MessageFactory context, ClassLoader targetClassLoader);
 
-  boolean isBarrierMessage();
+  /**
+   * isBarrierMessage - returns an empty optional for non barrier messages or wrapped checkpointId
+   * for barrier messages.
+   *
+   * <p>When this message represents a checkpoint barrier, this method returns an {@code Optional}
+   * of a checkpoint id that produced that barrier. For other types of messages (i.e. {@code
+   * Payload}) this method returns an empty {@code Optional}.
+   */
+  OptionalLong isBarrierMessage();
 
   Message copy(MessageFactory context);
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java
index 8880a5d..dabda14 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.message;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.OptionalLong;
 import javax.annotation.Nullable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.flink.core.generated.Envelope;
@@ -72,8 +73,12 @@ final class ProtobufMessage implements Message {
   }
 
   @Override
-  public boolean isBarrierMessage() {
-    return envelope.hasCheckpoint();
+  public OptionalLong isBarrierMessage() {
+    if (!envelope.hasCheckpoint()) {
+      return OptionalLong.empty();
+    }
+    final long checkpointId = envelope.getCheckpoint().getCheckpointId();
+    return OptionalLong.of(checkpointId);
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java
index cfc785c..c10f2e9 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.message;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.OptionalLong;
 import javax.annotation.Nullable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.flink.core.generated.Envelope;
@@ -62,8 +63,8 @@ final class SdkMessage implements Message {
   }
 
   @Override
-  public boolean isBarrierMessage() {
-    return false;
+  public OptionalLong isBarrierMessage() {
+    return OptionalLong.empty();
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
index ff8df49..df61045 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.translation;
 
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 import java.util.function.LongFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
@@ -26,7 +27,6 @@ import org.apache.flink.statefun.flink.core.StatefulFunctionsJobConstants;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.common.KeyBy;
 import org.apache.flink.statefun.flink.core.common.SerializableFunction;
-import org.apache.flink.statefun.flink.core.common.SerializablePredicate;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperatorFactory;
@@ -123,12 +123,13 @@ public final class FlinkUniverse {
     c.getTransformation().setParallelism(b.getParallelism());
   }
 
-  private static final class IsCheckpointBarrier implements SerializablePredicate<Message> {
+  private static final class IsCheckpointBarrier
+      implements SerializableFunction<Message, OptionalLong> {
 
     private static final long serialVersionUID = 1;
 
     @Override
-    public boolean test(Message message) {
+    public OptionalLong apply(Message message) {
       return message.isBarrierMessage();
     }
   }


[flink-statefun] 17/17: [hotfix] Avoid raw types in UnboundedFeedbackLoggerTest

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 50b4d1cea6de9fcc49e7a376b200dab56f61af3c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri May 15 18:56:38 2020 +0800

    [hotfix] Avoid raw types in UnboundedFeedbackLoggerTest
---
 .../flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
index 75c4c55..dd7088f 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
@@ -50,7 +50,7 @@ public class UnboundedFeedbackLoggerTest {
 
   @Test
   public void sanity() {
-    UnboundedFeedbackLogger logger = instanceUnderTest(128, 1);
+    UnboundedFeedbackLogger<Integer> logger = instanceUnderTest(128, 1);
 
     ByteArrayOutputStream output = new ByteArrayOutputStream();
     logger.startLogging(output);
@@ -61,7 +61,7 @@ public class UnboundedFeedbackLoggerTest {
 
   @Test(expected = IllegalStateException.class)
   public void commitWithoutStartLoggingShouldBeIllegal() {
-    UnboundedFeedbackLogger logger = instanceUnderTest(128, 1);
+    UnboundedFeedbackLogger<Integer> logger = instanceUnderTest(128, 1);
 
     logger.commit();
   }