You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/15 11:13:54 UTC

[flink-statefun] branch release-2.0 updated (1d73774 -> 12576da)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 1d73774  [FLINK-17684] [e2e] Enable E2E tests in Travis builds
     new 3f3da7a  [FLINK-17516] [e2e] Refactor StatefulFunctionsAppContainers to use Builder pattern
     new ba0fe7b  [FLINK-17516] [e2e] Expose master REST port in StatefulFunctionsAppContainers
     new c03683b  [FLINK-17516] [e2e] Enable exactly-once checkpointing in all e2e tests
     new c911987  [FLINK-17516] [e2e] Bind checkpoint dirs to temporary local directory
     new 168e585  [FLINK-17516] [e2e] Allow restarting workers with StatefulFunctionsAppContainers
     new 89c9f62  [FLINK-17516] [e2e] Add verification app for exactly-once E2E
     new 1057422  [FLINK-17516] [e2e] Implement exactly-once E2E against ExactlyOnceVerificationModule
     new 0fd74b4  [FLINK-17712] [build] Upgrade Flink version to 1.10.1
     new 2e8b618  [FLINK-16928] [core] Remove legacy scheduler config validation
     new e86890d  [FLINK-16928] Remove jobmanager.scheduler from all config files and docs
     new a025b92  [FLINK-17533] Add UnboundedFeedbackLoggerFactory
     new faeffdc  [FLINK-17533] Expose the UnboundedFeedbackLoggerFactory
     new ac6f253  [FLINK-17533] Expose checkpointId from barrier messages
     new eac06fb  [FLINK-17533] Add support for multiple concurrent checkpoints
     new 6e76280  [FLINK-17533] Remove concurrent checkpoints limitation
     new c97b706  [hotfix] Remove unused code
     new 12576da  [hotfix] Avoid raw types in UnboundedFeedbackLoggerTest

The 17 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/deployment-and-operations/packaging.md        |   2 -
 pom.xml                                            |   2 +-
 statefun-e2e-tests/pom.xml                         |   1 +
 .../e2e/common/StatefulFunctionsAppContainers.java | 363 ++++++++++++---------
 .../src/main/resources/flink-conf.yaml             |   3 +-
 .../pom.xml                                        |  19 +-
 .../flink/statefun/e2e/exactlyonce}/Constants.java |  19 +-
 .../ExactlyOnceVerificationModule.java}            |  25 +-
 .../flink/statefun/e2e/exactlyonce/FnCounter.java  |  30 +-
 .../statefun/e2e/exactlyonce/FnUnwrapper.java      |  21 +-
 .../flink/statefun/e2e/exactlyonce}/KafkaIO.java   |  50 +--
 .../main/protobuf/exactly-once-verification.proto} |  22 +-
 .../statefun/e2e/exactlyonce/ExactlyOnceE2E.java   | 155 +++++++++
 .../src/test/resources/Dockerfile                  |   6 +-
 .../src/test/resources/log4j.properties            |   0
 .../e2e/routablekafka/RoutableKafkaE2E.java        |   5 +-
 .../statefun/e2e/sanity/SanityVerificationE2E.java |   5 +-
 .../core/StatefulFunctionsConfigValidator.java     |  22 --
 .../flink/core/common/SerializablePredicate.java   |  23 --
 .../statefun/flink/core/feedback/Checkpoints.java  |  61 ++++
 .../flink/core/feedback/FeedbackUnionOperator.java |  34 +-
 .../feedback/FeedbackUnionOperatorFactory.java     |   6 +-
 .../core/functions/AsyncMessageDecorator.java      |   5 +-
 ...edStreamOperations.java => FeedbackLogger.java} |  14 +-
 .../flink/statefun/flink/core/logger/Loggers.java  |   7 +-
 .../flink/core/logger/UnboundedFeedbackLogger.java |  16 +-
 ...ry.java => UnboundedFeedbackLoggerFactory.java} |  30 +-
 .../flink/statefun/flink/core/message/Message.java |  11 +-
 .../flink/core/message/ProtobufMessage.java        |   9 +-
 .../statefun/flink/core/message/SdkMessage.java    |   5 +-
 .../flink/core/translation/FlinkUniverse.java      |   7 +-
 .../flink/core/StatefulFunctionsConfigTest.java    |  12 -
 .../flink/core/feedback/CheckpointsTest.java       | 143 ++++++++
 .../core/logger/UnboundedFeedbackLoggerTest.java   |   6 +-
 tools/docker/Dockerfile                            |   2 +-
 .../conf/flink-conf.yaml                           |   2 -
 tools/k8s/templates/config-map.yaml                |   1 -
 37 files changed, 783 insertions(+), 361 deletions(-)
 copy statefun-e2e-tests/{statefun-sanity-e2e => statefun-exactly-once-e2e}/pom.xml (93%)
 copy statefun-e2e-tests/{statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka => statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce}/Constants.java (61%)
 copy statefun-e2e-tests/{statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java => statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java} (60%)
 copy statefun-examples/statefun-flink-harness-example/src/main/java/org/apache/flink/statefun/examples/harness/MyFunction.java => statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java (51%)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/grpcfn/GrpcFunction.java => statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java (61%)
 copy statefun-e2e-tests/{statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity => statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce}/KafkaIO.java (54%)
 copy statefun-e2e-tests/{statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto => statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto} (69%)
 create mode 100644 statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java
 copy statefun-e2e-tests/{statefun-sanity-e2e => statefun-exactly-once-e2e}/src/test/resources/Dockerfile (82%)
 copy statefun-e2e-tests/{statefun-sanity-e2e => statefun-exactly-once-e2e}/src/test/resources/log4j.properties (100%)
 delete mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/SerializablePredicate.java
 create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/{CheckpointedStreamOperations.java => FeedbackLogger.java} (72%)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/{KeyGroupStreamFactory.java => UnboundedFeedbackLoggerFactory.java} (55%)
 create mode 100644 statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java


[flink-statefun] 09/17: [FLINK-16928] [core] Remove legacy scheduler config validation

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 2e8b61874bdb2eff6dd92cef7e2d1445afbbf888
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri May 15 14:24:23 2020 +0800

    [FLINK-16928] [core] Remove legacy scheduler config validation
---
 .../flink/core/StatefulFunctionsConfigValidator.java         | 10 ----------
 .../statefun/flink/core/StatefulFunctionsConfigTest.java     | 12 ------------
 2 files changed, 22 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
index 88cc943..9a7b0d1 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
@@ -26,7 +26,6 @@ import java.util.Locale;
 import java.util.Set;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
 import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 
@@ -43,7 +42,6 @@ public final class StatefulFunctionsConfigValidator {
   static void validate(Configuration configuration) {
     validateParentFirstClassloaderPatterns(configuration);
     validateMaxConcurrentCheckpoints(configuration);
-    validateLegacyScheduler(configuration);
   }
 
   private static void validateParentFirstClassloaderPatterns(Configuration configuration) {
@@ -66,14 +64,6 @@ public final class StatefulFunctionsConfigValidator {
     }
   }
 
-  private static void validateLegacyScheduler(Configuration configuration) {
-    String configuredScheduler = configuration.get(JobManagerOptions.SCHEDULER);
-    if (!"legacy".equalsIgnoreCase(configuredScheduler)) {
-      throw new StatefulFunctionsInvalidConfigException(
-          JobManagerOptions.SCHEDULER, "Currently the only supported scheduler is 'legacy'");
-    }
-  }
-
   private static Set<String> parentFirstClassloaderPatterns(Configuration configuration) {
     final String[] split =
         configuration.get(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL).split(";");
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
index cc7f8f3..5a93dd7 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.statefun.flink.core;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
@@ -45,7 +44,6 @@ public class StatefulFunctionsConfigTest {
     configuration.set(
         CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
         "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
-    configuration.set(JobManagerOptions.SCHEDULER, "legacy");
     configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);
     configuration.setString("statefun.module.global-config.key1", "value1");
     configuration.setString("statefun.module.global-config.key2", "value2");
@@ -63,15 +61,6 @@ public class StatefulFunctionsConfigTest {
   }
 
   @Test(expected = StatefulFunctionsInvalidConfigException.class)
-  public void testMissingScheduler() {
-    Configuration configuration = validConfiguration();
-
-    configuration.removeConfig(JobManagerOptions.SCHEDULER);
-
-    new StatefulFunctionsConfig(configuration);
-  }
-
-  @Test(expected = StatefulFunctionsInvalidConfigException.class)
   public void invalidStrictFlinkConfigsThrows() {
     Configuration configuration = new Configuration();
     new StatefulFunctionsConfig(configuration);
@@ -90,7 +79,6 @@ public class StatefulFunctionsConfigTest {
         CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
         "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
     configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);
-    configuration.set(JobManagerOptions.SCHEDULER, "legacy");
     return configuration;
   }
 }


[flink-statefun] 02/17: [FLINK-17516] [e2e] Expose master REST port in StatefulFunctionsAppContainers

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit ba0fe7b1f0e45148680e0ee82701061f2a979c6a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue May 12 23:08:25 2020 +0800

    [FLINK-17516] [e2e] Expose master REST port in StatefulFunctionsAppContainers
---
 .../flink/statefun/e2e/common/StatefulFunctionsAppContainers.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index 9a3f2a3..5880ae3 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -147,6 +147,11 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     workers.forEach(GenericContainer::stop);
   }
 
+  /** @return the exposed port on master for calling REST APIs. */
+  public int getMasterRestPort() {
+    return master.getMappedPort(8081);
+  }
+
   public static final class Builder {
     private static final String MASTER_HOST = "statefun-app-master";
     private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
@@ -289,7 +294,8 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
               .withNetworkAliases(MASTER_HOST)
               .withEnv("ROLE", "master")
               .withEnv("MASTER_HOST", MASTER_HOST)
-              .withCommand("-p " + numWorkers);
+              .withCommand("-p " + numWorkers)
+              .withExposedPorts(8081);
 
       for (GenericContainer<?> dependent : dependents) {
         master.dependsOn(dependent);


[flink-statefun] 11/17: [FLINK-17533] Add UnboundedFeedbackLoggerFactory

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit a025b92493cf9563551eeb4cd37844d05a5004d6
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 21:19:22 2020 +0200

    [FLINK-17533] Add UnboundedFeedbackLoggerFactory
---
 .../logger/UnboundedFeedbackLoggerFactory.java     | 49 ++++++++++++++++++++++
 1 file changed, 49 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerFactory.java
new file mode 100644
index 0000000..dd0052f
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.logger;
+
+import java.util.Objects;
+import java.util.function.Supplier;
+import java.util.function.ToIntFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.statefun.flink.core.di.Inject;
+import org.apache.flink.statefun.flink.core.di.Label;
+
+public final class UnboundedFeedbackLoggerFactory<T> {
+  private final Supplier<KeyGroupStream<T>> supplier;
+  private final ToIntFunction<T> keyGroupAssigner;
+  private final CheckpointedStreamOperations checkpointedStreamOperations;
+  private final TypeSerializer<T> serializer;
+
+  @Inject
+  public UnboundedFeedbackLoggerFactory(
+      @Label("key-group-supplier") Supplier<KeyGroupStream<T>> supplier,
+      @Label("key-group-assigner") ToIntFunction<T> keyGroupAssigner,
+      @Label("checkpoint-stream-ops") CheckpointedStreamOperations ops,
+      @Label("envelope-serializer") TypeSerializer<T> serializer) {
+    this.supplier = Objects.requireNonNull(supplier);
+    this.keyGroupAssigner = Objects.requireNonNull(keyGroupAssigner);
+    this.serializer = Objects.requireNonNull(serializer);
+    this.checkpointedStreamOperations = Objects.requireNonNull(ops);
+  }
+
+  public UnboundedFeedbackLogger<T> create() {
+    return new UnboundedFeedbackLogger<>(
+        supplier, keyGroupAssigner, checkpointedStreamOperations, serializer);
+  }
+}


[flink-statefun] 12/17: [FLINK-17533] Expose the UnboundedFeedbackLoggerFactory

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit faeffdc089cbe0ea52bdbdf998d19893dbd68aa6
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 21:32:07 2020 +0200

    [FLINK-17533] Expose the UnboundedFeedbackLoggerFactory
---
 .../flink/statefun/flink/core/logger/Loggers.java     | 19 +++++++++++++++++--
 .../flink/core/logger/UnboundedFeedbackLogger.java    | 11 ++++-------
 .../core/logger/UnboundedFeedbackLoggerTest.java      |  2 +-
 3 files changed, 22 insertions(+), 10 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
index 62720bf..948a808 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
@@ -43,10 +43,25 @@ public final class Loggers {
       TypeSerializer<?> serializer,
       Function<?, ?> keySelector) {
 
+    UnboundedFeedbackLoggerFactory<?> factory =
+        unboundedSpillableLoggerFactory(
+            ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector);
+
+    return factory.create();
+  }
+
+  public static UnboundedFeedbackLoggerFactory<?> unboundedSpillableLoggerFactory(
+      IOManager ioManager,
+      int maxParallelism,
+      long inMemoryMaxBufferSize,
+      TypeSerializer<?> serializer,
+      Function<?, ?> keySelector) {
+
     ObjectContainer container =
         unboundedSpillableLoggerContainer(
             ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector);
-    return container.get(UnboundedFeedbackLogger.class);
+
+    return container.get(UnboundedFeedbackLoggerFactory.class);
   }
 
   /** Wires the required dependencies to construct an {@link UnboundedFeedbackLogger}. */
@@ -70,7 +85,7 @@ public final class Loggers {
         "checkpoint-stream-ops",
         CheckpointedStreamOperations.class,
         KeyedStateCheckpointOutputStreamOps.INSTANCE);
-    container.add(UnboundedFeedbackLogger.class);
+    container.add(UnboundedFeedbackLoggerFactory.class);
     return container;
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
index 8ddc22e..ef0360a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
@@ -35,8 +35,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.statefun.flink.core.di.Inject;
-import org.apache.flink.statefun.flink.core.di.Label;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
 import org.apache.flink.util.IOUtils;
 
@@ -50,12 +48,11 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
   private TypeSerializer<T> serializer;
   private Closeable snapshotLease;
 
-  @Inject
   public UnboundedFeedbackLogger(
-      @Label("key-group-supplier") Supplier<KeyGroupStream<T>> supplier,
-      @Label("key-group-assigner") ToIntFunction<T> keyGroupAssigner,
-      @Label("checkpoint-stream-ops") CheckpointedStreamOperations ops,
-      @Label("envelope-serializer") TypeSerializer<T> serializer) {
+      Supplier<KeyGroupStream<T>> supplier,
+      ToIntFunction<T> keyGroupAssigner,
+      CheckpointedStreamOperations ops,
+      TypeSerializer<T> serializer) {
     this.supplier = Objects.requireNonNull(supplier);
     this.keyGroupAssigner = Objects.requireNonNull(keyGroupAssigner);
     this.serializer = Objects.requireNonNull(serializer);
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
index 8b66b21..75c4c55 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
@@ -115,7 +115,7 @@ public class UnboundedFeedbackLoggerTest {
             IO_MANAGER, maxParallelism, totalMemory, IntSerializer.INSTANCE, Function.identity());
 
     container.add("checkpoint-stream-ops", CheckpointedStreamOperations.class, NOOP.INSTANCE);
-    return container.get(UnboundedFeedbackLogger.class);
+    return container.get(UnboundedFeedbackLoggerFactory.class).create();
   }
 
   enum NOOP implements CheckpointedStreamOperations {


[flink-statefun] 06/17: [FLINK-17516] [e2e] Add verification app for exactly-once E2E

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 89c9f62ec8327f154ce3e8b3fbaf32cad735fd7d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri May 15 18:33:05 2020 +0800

    [FLINK-17516] [e2e] Add verification app for exactly-once E2E
---
 statefun-e2e-tests/pom.xml                         |   1 +
 .../statefun-exactly-once-e2e/pom.xml              | 109 +++++++++++++++++++++
 .../flink/statefun/e2e/exactlyonce/Constants.java  |  39 ++++++++
 .../exactlyonce/ExactlyOnceVerificationModule.java |  65 ++++++++++++
 .../flink/statefun/e2e/exactlyonce/FnCounter.java  |  47 +++++++++
 .../statefun/e2e/exactlyonce/FnUnwrapper.java      |  40 ++++++++
 .../flink/statefun/e2e/exactlyonce/KafkaIO.java    |  92 +++++++++++++++++
 .../main/protobuf/exactly-once-verification.proto  |  39 ++++++++
 8 files changed, 432 insertions(+)

diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 218ed18..83cba8d 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -32,6 +32,7 @@ under the License.
         <module>statefun-e2e-tests-common</module>
         <module>statefun-sanity-e2e</module>
         <module>statefun-routable-kafka-e2e</module>
+        <module>statefun-exactly-once-e2e</module>
     </modules>
 
     <build>
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml b/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml
new file mode 100644
index 0000000..0ea4d6a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>statefun-e2e-tests</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>2.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>statefun-exactly-once-e2e</artifactId>
+
+    <properties>
+        <testcontainers.version>1.12.5</testcontainers.version>
+    </properties>
+
+    <dependencies>
+        <!-- Stateful Functions -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-kafka-io</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Protobuf -->
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+
+        <!-- logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.15</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>3.14.6</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- End-to-end test common -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-e2e-tests-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Testcontainers KafkaContainer -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.github.os72</groupId>
+                <artifactId>protoc-jar-maven-plugin</artifactId>
+                <version>${protoc-jar-maven-plugin.version}</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <configuration>
+                    <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java
new file mode 100644
index 0000000..ab66809
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+
+final class Constants {
+
+  private Constants() {}
+
+  static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
+
+  static final IngressIdentifier<WrappedMessage> INGRESS_ID =
+      new IngressIdentifier<>(
+          WrappedMessage.class, "org.apache.flink.e2e.exactlyonce", "wrapped-messages");
+
+  static final EgressIdentifier<InvokeCount> EGRESS_ID =
+      new EgressIdentifier<>(
+          "org.apache.flink.e2e.exactlyonce", "invoke-counts", InvokeCount.class);
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java
new file mode 100644
index 0000000..9dbb465
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+/**
+ * This is a a simple application used for testing end-to-end exactly-once semantics.
+ *
+ * <p>The application reads {@link WrappedMessage}s from a Kafka ingress which gets routed to {@link
+ * FnUnwrapper} functions, which in turn simply forwards the messages to {@link FnCounter} functions
+ * with specified target keys defined in the wrapped message. The counter function keeps count of
+ * the number of times each key as been invoked, and sinks that count to an exactly-once delivery
+ * Kafka egress for verification.
+ */
+@AutoService(StatefulFunctionModule.class)
+public class ExactlyOnceVerificationModule implements StatefulFunctionModule {
+
+  @Override
+  public void configure(Map<String, String> globalConfiguration, Binder binder) {
+    String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
+    if (kafkaBootstrapServers == null) {
+      throw new IllegalStateException(
+          "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
+    }
+
+    configureKafkaIO(kafkaBootstrapServers, binder);
+    configureAddressTaggerFunctions(binder);
+  }
+
+  private static void configureKafkaIO(String kafkaAddress, Binder binder) {
+    final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
+
+    binder.bindIngress(kafkaIO.getIngressSpec());
+    binder.bindIngressRouter(
+        Constants.INGRESS_ID,
+        ((message, downstream) -> downstream.forward(FnUnwrapper.TYPE, message.getKey(), message)));
+
+    binder.bindEgress(kafkaIO.getEgressSpec());
+  }
+
+  private static void configureAddressTaggerFunctions(Binder binder) {
+    binder.bindFunctionProvider(FnUnwrapper.TYPE, ignored -> new FnUnwrapper());
+    binder.bindFunctionProvider(FnCounter.TYPE, ignored -> new FnCounter());
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java
new file mode 100644
index 0000000..5243ebd
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+final class FnCounter implements StatefulFunction {
+
+  static final FunctionType TYPE = new FunctionType("org.apache.flink.e2e.exactlyonce", "counter");
+
+  @Persisted
+  private final PersistedValue<Integer> invokeCountState =
+      PersistedValue.of("invoke-count", Integer.class);
+
+  @Override
+  public void invoke(Context context, Object input) {
+    final int previousCount = invokeCountState.getOrDefault(0);
+    final int currentCount = previousCount + 1;
+
+    final InvokeCount invokeCount =
+        InvokeCount.newBuilder().setId(context.self().id()).setInvokeCount(currentCount).build();
+    invokeCountState.set(currentCount);
+
+    context.send(Constants.EGRESS_ID, invokeCount);
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java
new file mode 100644
index 0000000..990e545
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+
+final class FnUnwrapper implements StatefulFunction {
+
+  static final FunctionType TYPE =
+      new FunctionType("org.apache.flink.e2e.exactlyonce", "unwrapper");
+
+  @Override
+  public void invoke(Context context, Object input) {
+    final WrappedMessage message = requireWrappedMessage(input);
+    context.send(FnCounter.TYPE, message.getInvokeTargetId(), message);
+  }
+
+  private static WrappedMessage requireWrappedMessage(Object input) {
+    return (WrappedMessage) input;
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java
new file mode 100644
index 0000000..4df60ca
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+final class KafkaIO {
+
+  static final String WRAPPED_MESSAGES_TOPIC_NAME = "wrapped-messages";
+  static final String INVOKE_COUNTS_TOPIC_NAME = "invoke-counts";
+
+  private final String kafkaAddress;
+
+  KafkaIO(String kafkaAddress) {
+    this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
+  }
+
+  IngressSpec<WrappedMessage> getIngressSpec() {
+    return KafkaIngressBuilder.forIdentifier(Constants.INGRESS_ID)
+        .withTopic(KafkaIO.WRAPPED_MESSAGES_TOPIC_NAME)
+        .withKafkaAddress(kafkaAddress)
+        .withStartupPosition(KafkaIngressStartupPosition.fromEarliest())
+        .withConsumerGroupId("exactly-once-e2e")
+        .withDeserializer(WrappedMessageKafkaDeserializer.class)
+        .build();
+  }
+
+  EgressSpec<InvokeCount> getEgressSpec() {
+    return KafkaEgressBuilder.forIdentifier(Constants.EGRESS_ID)
+        .withKafkaAddress(kafkaAddress)
+        .withExactlyOnceProducerSemantics(Duration.ofMinutes(1))
+        .withSerializer(InvokeCountKafkaSerializer.class)
+        .build();
+  }
+
+  private static final class WrappedMessageKafkaDeserializer
+      implements KafkaIngressDeserializer<WrappedMessage> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public WrappedMessage deserialize(ConsumerRecord<byte[], byte[]> input) {
+      try {
+        return WrappedMessage.parseFrom(input.value());
+      } catch (Exception e) {
+        throw new RuntimeException("Error deserializing messages", e);
+      }
+    }
+  }
+
+  private static final class InvokeCountKafkaSerializer
+      implements KafkaEgressSerializer<InvokeCount> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serialize(InvokeCount invokeCount) {
+      final byte[] key = invokeCount.getIdBytes().toByteArray();
+      final byte[] value = invokeCount.toByteArray();
+
+      return new ProducerRecord<>(INVOKE_COUNTS_TOPIC_NAME, key, value);
+    }
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto
new file mode 100644
index 0000000..5e8b41a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+option java_package = "org.apache.flink.statefun.e2e.exactlyonce.generated";
+option java_multiple_files = false;
+
+message WrappedMessage {
+    string invokeTargetId = 1;
+    string key = 2;
+}
+
+message FnAddress {
+    string namespace = 1;
+    string type = 2;
+    string id = 3;
+}
+
+message InvokeCount {
+    string id = 1;
+    int32 invokeCount = 2;
+}


[flink-statefun] 07/17: [FLINK-17516] [e2e] Implement exactly-once E2E against ExactlyOnceVerificationModule

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 1057422f05e9fb0ec258c6fc7a4004f953d68d74
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri May 15 18:33:29 2020 +0800

    [FLINK-17516] [e2e] Implement exactly-once E2E against ExactlyOnceVerificationModule
    
    This closes #108.
---
 .../statefun/e2e/exactlyonce/ExactlyOnceE2E.java   | 155 +++++++++++++++++++++
 .../src/test/resources/Dockerfile                  |  20 +++
 .../src/test/resources/log4j.properties            |  24 ++++
 3 files changed, 199 insertions(+)

diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java
new file mode 100644
index 0000000..b0c61d0
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
+import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
+import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+
+/**
+ * End-to-end test based on the {@link ExactlyOnceVerificationModule} application.
+ *
+ * <p>This test writes some {@link WrappedMessage} records to Kafka, which eventually gets routed to
+ * the counter function in the application. Then, after the corresponding {@link InvokeCount} are
+ * seen in the Kafka egress (which implies some checkpoints have been completed since the
+ * verification application is using exactly-once delivery), we restart a worker to simulate
+ * failure. The application should automatically attempt to recover and eventually restart.
+ * Meanwhile, more records are written to Kafka again. We verify that on the consumer side, the
+ * invocation counts increase sequentially for each key as if the failure did not occur.
+ */
+public class ExactlyOnceE2E {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceE2E.class);
+
+  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+  private static final String KAFKA_HOST = "kafka-broker";
+
+  private static final int NUM_WORKERS = 2;
+
+  /**
+   * Kafka broker. We need to explicitly set the transaction state log replication factor and min
+   * ISR since by default, those values are larger than 1 while we are only using 1 Kafka broker.
+   */
+  @Rule
+  public KafkaContainer kafka =
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
+          .withNetworkAliases(KAFKA_HOST)
+          .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
+          .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
+
+  @Rule
+  public StatefulFunctionsAppContainers verificationApp =
+      StatefulFunctionsAppContainers.builder("exactly-once-verification", NUM_WORKERS)
+          .dependsOn(kafka)
+          .exposeMasterLogs(LOG)
+          .withModuleGlobalConfiguration(
+              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
+          .build();
+
+  @Test(timeout = 300_000L)
+  public void run() throws Exception {
+    final String kafkaAddress = kafka.getBootstrapServers();
+
+    final Producer<String, WrappedMessage> messageProducer =
+        kafkaWrappedMessagesProducer(kafkaAddress);
+    final Consumer<String, InvokeCount> invokeCountsConsumer =
+        kafkaInvokeCountsConsumer(kafkaAddress);
+
+    final KafkaIOVerifier<String, WrappedMessage, String, InvokeCount> verifier =
+        new KafkaIOVerifier<>(messageProducer, invokeCountsConsumer);
+
+    assertThat(
+        verifier.sending(wrappedMessage("foo"), wrappedMessage("foo"), wrappedMessage("foo")),
+        verifier.resultsInOrder(
+            is(invokeCount("foo", 1)), is(invokeCount("foo", 2)), is(invokeCount("foo", 3))));
+
+    LOG.info(
+        "Restarting random worker to simulate failure. The application should automatically recover.");
+    verificationApp.restartWorker(randomWorkerIndex());
+
+    assertThat(
+        verifier.sending(wrappedMessage("foo"), wrappedMessage("foo"), wrappedMessage("foo")),
+        verifier.resultsInOrder(
+            is(invokeCount("foo", 4)), is(invokeCount("foo", 5)), is(invokeCount("foo", 6))));
+  }
+
+  private static Producer<String, WrappedMessage> kafkaWrappedMessagesProducer(
+      String bootstrapServers) {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", bootstrapServers);
+
+    return new KafkaProducer<>(
+        props, new StringSerializer(), new KafkaProtobufSerializer<>(WrappedMessage.parser()));
+  }
+
+  private Consumer<String, InvokeCount> kafkaInvokeCountsConsumer(String bootstrapServers) {
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
+    consumerProps.setProperty("group.id", "exactly-once-e2e");
+    consumerProps.setProperty("auto.offset.reset", "earliest");
+    consumerProps.setProperty("isolation.level", "read_committed");
+
+    KafkaConsumer<String, InvokeCount> consumer =
+        new KafkaConsumer<>(
+            consumerProps,
+            new StringDeserializer(),
+            new KafkaProtobufSerializer<>(InvokeCount.parser()));
+    consumer.subscribe(Collections.singletonList(KafkaIO.INVOKE_COUNTS_TOPIC_NAME));
+
+    return consumer;
+  }
+
+  private static ProducerRecord<String, WrappedMessage> wrappedMessage(String targetInvokeId) {
+    final String key = UUID.randomUUID().toString();
+
+    return new ProducerRecord<>(
+        KafkaIO.WRAPPED_MESSAGES_TOPIC_NAME,
+        key,
+        WrappedMessage.newBuilder().setInvokeTargetId(targetInvokeId).setKey(key).build());
+  }
+
+  private static InvokeCount invokeCount(String id, int count) {
+    return InvokeCount.newBuilder().setId(id).setInvokeCount(count).build();
+  }
+
+  private static int randomWorkerIndex() {
+    return new Random().nextInt(NUM_WORKERS);
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/Dockerfile
new file mode 100644
index 0000000..7b2cca6
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/Dockerfile
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM flink-statefun:2.1-SNAPSHOT
+
+RUN mkdir -p /opt/statefun/modules/statefun-exactly-once-e2e
+COPY statefun-exactly-once-e2e*.jar /opt/statefun/modules/statefun-exactly-once-e2e/
+COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fb965d3
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n


[flink-statefun] 04/17: [FLINK-17516] [e2e] Bind checkpoint dirs to temporary local directory

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit c9119870777cd9f51d392e63ac9d845ceceb1f34
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue May 12 23:24:55 2020 +0800

    [FLINK-17516] [e2e] Bind checkpoint dirs to temporary local directory
---
 .../e2e/common/StatefulFunctionsAppContainers.java  | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index 5880ae3..9894c2d 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -19,6 +19,7 @@
 package org.apache.flink.statefun.e2e.common;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -33,9 +34,11 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
+import org.apache.flink.util.FileUtils;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -118,6 +121,8 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
   private GenericContainer<?> master;
   private List<GenericContainer<?>> workers;
 
+  private File checkpointDir;
+
   private StatefulFunctionsAppContainers(
       GenericContainer<?> masterContainer, List<GenericContainer<?>> workerContainers) {
     this.master = Objects.requireNonNull(masterContainer);
@@ -137,6 +142,15 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
 
   @Override
   protected void before() throws Throwable {
+    checkpointDir = temporaryCheckpointDir();
+
+    master.withFileSystemBind(
+        checkpointDir.getAbsolutePath(), "/checkpoint-dir", BindMode.READ_WRITE);
+    workers.forEach(
+        worker ->
+            worker.withFileSystemBind(
+                checkpointDir.getAbsolutePath(), "/checkpoint-dir", BindMode.READ_WRITE));
+
     master.start();
     workers.forEach(GenericContainer::start);
   }
@@ -145,6 +159,8 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
   protected void after() {
     master.stop();
     workers.forEach(GenericContainer::stop);
+
+    FileUtils.deleteDirectoryQuietly(checkpointDir);
   }
 
   /** @return the exposed port on master for calling REST APIs. */
@@ -152,6 +168,11 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     return master.getMappedPort(8081);
   }
 
+  private static File temporaryCheckpointDir() throws IOException {
+    final Path currentWorkingDir = Paths.get(System.getProperty("user.dir"));
+    return Files.createTempDirectory(currentWorkingDir, "statefun-app-checkpoints-").toFile();
+  }
+
   public static final class Builder {
     private static final String MASTER_HOST = "statefun-app-master";
     private static final String WORKER_HOST_PREFIX = "statefun-app-worker";


[flink-statefun] 17/17: [hotfix] Avoid raw types in UnboundedFeedbackLoggerTest

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 12576da44e2dcb7efbe26486bd46abfdb7affb8f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri May 15 18:56:38 2020 +0800

    [hotfix] Avoid raw types in UnboundedFeedbackLoggerTest
---
 .../flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
index 75c4c55..dd7088f 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
@@ -50,7 +50,7 @@ public class UnboundedFeedbackLoggerTest {
 
   @Test
   public void sanity() {
-    UnboundedFeedbackLogger logger = instanceUnderTest(128, 1);
+    UnboundedFeedbackLogger<Integer> logger = instanceUnderTest(128, 1);
 
     ByteArrayOutputStream output = new ByteArrayOutputStream();
     logger.startLogging(output);
@@ -61,7 +61,7 @@ public class UnboundedFeedbackLoggerTest {
 
   @Test(expected = IllegalStateException.class)
   public void commitWithoutStartLoggingShouldBeIllegal() {
-    UnboundedFeedbackLogger logger = instanceUnderTest(128, 1);
+    UnboundedFeedbackLogger<Integer> logger = instanceUnderTest(128, 1);
 
     logger.commit();
   }


[flink-statefun] 03/17: [FLINK-17516] [e2e] Enable exactly-once checkpointing in all e2e tests

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit c03683bc7fd31912fbd31f0827317eceb72942e6
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue May 12 23:24:06 2020 +0800

    [FLINK-17516] [e2e] Enable exactly-once checkpointing in all e2e tests
---
 .../statefun-e2e-tests-common/src/main/resources/flink-conf.yaml        | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
index 7d7c307..d081c43 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
@@ -21,3 +21,5 @@ state.checkpoints.dir: file:///checkpoint-dir
 state.backend.incremental: true
 taskmanager.memory.process.size: 4g
 jobmanager.scheduler: legacy
+execution.checkpointing.interval: 5sec
+execution.checkpointing.mode: EXACTLY_ONCE


[flink-statefun] 08/17: [FLINK-17712] [build] Upgrade Flink version to 1.10.1

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 0fd74b4ec146938a7939b4c72b41f705413fa8d7
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri May 15 13:16:47 2020 +0800

    [FLINK-17712] [build] Upgrade Flink version to 1.10.1
    
    This closes #111.
---
 pom.xml                 | 2 +-
 tools/docker/Dockerfile | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index e38c2dd..40e164a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@ under the License.
         <auto-service.version>1.0-rc6</auto-service.version>
         <protobuf.version>3.7.1</protobuf.version>
         <protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version>
-        <flink.version>1.10.0</flink.version>
+        <flink.version>1.10.1</flink.version>
     </properties>
 
     <dependencies>
diff --git a/tools/docker/Dockerfile b/tools/docker/Dockerfile
index 7c41d38..e697e33 100644
--- a/tools/docker/Dockerfile
+++ b/tools/docker/Dockerfile
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM flink:1.10.0
+FROM flink:1.10.1
 
 ENV ROLE worker
 ENV MASTER_HOST localhost


[flink-statefun] 15/17: [FLINK-17533] Remove concurrent checkpoints limitation

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 6e762807f58aa619abd5b999dd72b499b389c8a5
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 23:03:36 2020 +0200

    [FLINK-17533] Remove concurrent checkpoints limitation
    
    This closes #105.
---
 docs/deployment-and-operations/packaging.md                  |  1 -
 .../flink/core/StatefulFunctionsConfigValidator.java         | 12 ------------
 .../docker/flink-distribution-template/conf/flink-conf.yaml  |  1 -
 3 files changed, 14 deletions(-)

diff --git a/docs/deployment-and-operations/packaging.md b/docs/deployment-and-operations/packaging.md
index d603920..7af18f1 100644
--- a/docs/deployment-and-operations/packaging.md
+++ b/docs/deployment-and-operations/packaging.md
@@ -73,6 +73,5 @@ The following configurations are strictly required for running StateFun applicat
 
 {% highlight yaml %}
 classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
-execution.checkpointing.max-concurrent-checkpoints: 1
 {% endhighlight %}
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
index 9a7b0d1..c4f658c 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
@@ -27,7 +27,6 @@ import java.util.Set;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
-import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 
 public final class StatefulFunctionsConfigValidator {
 
@@ -41,7 +40,6 @@ public final class StatefulFunctionsConfigValidator {
 
   static void validate(Configuration configuration) {
     validateParentFirstClassloaderPatterns(configuration);
-    validateMaxConcurrentCheckpoints(configuration);
   }
 
   private static void validateParentFirstClassloaderPatterns(Configuration configuration) {
@@ -54,16 +52,6 @@ public final class StatefulFunctionsConfigValidator {
     }
   }
 
-  private static void validateMaxConcurrentCheckpoints(Configuration configuration) {
-    final int maxConcurrentCheckpoints =
-        configuration.get(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS);
-    if (maxConcurrentCheckpoints != 1) {
-      throw new StatefulFunctionsInvalidConfigException(
-          ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS,
-          "Value must be 1, Stateful Functions does not support concurrent checkpoints.");
-    }
-  }
-
   private static Set<String> parentFirstClassloaderPatterns(Configuration configuration) {
     final String[] split =
         configuration.get(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL).split(";");
diff --git a/tools/docker/flink-distribution-template/conf/flink-conf.yaml b/tools/docker/flink-distribution-template/conf/flink-conf.yaml
index d0d4522..430c4cb 100644
--- a/tools/docker/flink-distribution-template/conf/flink-conf.yaml
+++ b/tools/docker/flink-distribution-template/conf/flink-conf.yaml
@@ -19,7 +19,6 @@
 #==============================================================================
 
 classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
-execution.checkpointing.max-concurrent-checkpoints: 1
 
 #==============================================================================
 # Recommended configurations. Users may change according to their needs.


[flink-statefun] 13/17: [FLINK-17533] Expose checkpointId from barrier messages

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit ac6f2530ae8ba4272e8c3c4dc38bdb499092d9c0
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 22:27:02 2020 +0200

    [FLINK-17533] Expose checkpointId from barrier messages
    
    This commit changes the boolean method isBarrierMessage() to
    return an OptionalLong that would carry the checkpointId when the message
    is a checkpoint barrier.
---
 .../flink/core/feedback/FeedbackUnionOperatorFactory.java     |  6 +++---
 .../statefun/flink/core/functions/AsyncMessageDecorator.java  |  5 +++--
 .../org/apache/flink/statefun/flink/core/message/Message.java | 11 ++++++++++-
 .../flink/statefun/flink/core/message/ProtobufMessage.java    |  9 +++++++--
 .../apache/flink/statefun/flink/core/message/SdkMessage.java  |  5 +++--
 .../flink/statefun/flink/core/translation/FlinkUniverse.java  |  7 ++++---
 6 files changed, 30 insertions(+), 13 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
index 580ad2d..f87128d 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
@@ -18,10 +18,10 @@
 package org.apache.flink.statefun.flink.core.feedback;
 
 import java.util.Objects;
+import java.util.OptionalLong;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.common.SerializableFunction;
-import org.apache.flink.statefun.flink.core.common.SerializablePredicate;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.*;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -35,7 +35,7 @@ public final class FeedbackUnionOperatorFactory<E>
   private final StatefulFunctionsConfig configuration;
 
   private final FeedbackKey<E> feedbackKey;
-  private final SerializablePredicate<E> isBarrierMessage;
+  private final SerializableFunction<E, OptionalLong> isBarrierMessage;
   private final SerializableFunction<E, ?> keySelector;
 
   private transient MailboxExecutor mailboxExecutor;
@@ -43,7 +43,7 @@ public final class FeedbackUnionOperatorFactory<E>
   public FeedbackUnionOperatorFactory(
       StatefulFunctionsConfig configuration,
       FeedbackKey<E> feedbackKey,
-      SerializablePredicate<E> isBarrierMessage,
+      SerializableFunction<E, OptionalLong> isBarrierMessage,
       SerializableFunction<E, ?> keySelector) {
     this.feedbackKey = Objects.requireNonNull(feedbackKey);
     this.isBarrierMessage = Objects.requireNonNull(isBarrierMessage);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
index ee6e869..c77adb7 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.statefun.flink.core.functions;
 
+import java.util.OptionalLong;
 import javax.annotation.Nullable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.flink.core.message.Message;
@@ -87,8 +88,8 @@ final class AsyncMessageDecorator<T> implements Message {
   }
 
   @Override
-  public boolean isBarrierMessage() {
-    return false;
+  public OptionalLong isBarrierMessage() {
+    return OptionalLong.empty();
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java
index 2e4cf99..9b0cbee 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java
@@ -18,6 +18,7 @@
 package org.apache.flink.statefun.flink.core.message;
 
 import java.io.IOException;
+import java.util.OptionalLong;
 import javax.annotation.Nullable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.sdk.Address;
@@ -31,7 +32,15 @@ public interface Message {
 
   Object payload(MessageFactory context, ClassLoader targetClassLoader);
 
-  boolean isBarrierMessage();
+  /**
+   * isBarrierMessage - returns an empty optional for non barrier messages or wrapped checkpointId
+   * for barrier messages.
+   *
+   * <p>When this message represents a checkpoint barrier, this method returns an {@code Optional}
+   * of a checkpoint id that produced that barrier. For other types of messages (i.e. {@code
+   * Payload}) this method returns an empty {@code Optional}.
+   */
+  OptionalLong isBarrierMessage();
 
   Message copy(MessageFactory context);
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java
index 8880a5d..dabda14 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.message;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.OptionalLong;
 import javax.annotation.Nullable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.flink.core.generated.Envelope;
@@ -72,8 +73,12 @@ final class ProtobufMessage implements Message {
   }
 
   @Override
-  public boolean isBarrierMessage() {
-    return envelope.hasCheckpoint();
+  public OptionalLong isBarrierMessage() {
+    if (!envelope.hasCheckpoint()) {
+      return OptionalLong.empty();
+    }
+    final long checkpointId = envelope.getCheckpoint().getCheckpointId();
+    return OptionalLong.of(checkpointId);
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java
index cfc785c..c10f2e9 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.message;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.OptionalLong;
 import javax.annotation.Nullable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.flink.core.generated.Envelope;
@@ -62,8 +63,8 @@ final class SdkMessage implements Message {
   }
 
   @Override
-  public boolean isBarrierMessage() {
-    return false;
+  public OptionalLong isBarrierMessage() {
+    return OptionalLong.empty();
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
index ff8df49..df61045 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.translation;
 
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 import java.util.function.LongFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
@@ -26,7 +27,6 @@ import org.apache.flink.statefun.flink.core.StatefulFunctionsJobConstants;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.common.KeyBy;
 import org.apache.flink.statefun.flink.core.common.SerializableFunction;
-import org.apache.flink.statefun.flink.core.common.SerializablePredicate;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperatorFactory;
@@ -123,12 +123,13 @@ public final class FlinkUniverse {
     c.getTransformation().setParallelism(b.getParallelism());
   }
 
-  private static final class IsCheckpointBarrier implements SerializablePredicate<Message> {
+  private static final class IsCheckpointBarrier
+      implements SerializableFunction<Message, OptionalLong> {
 
     private static final long serialVersionUID = 1;
 
     @Override
-    public boolean test(Message message) {
+    public OptionalLong apply(Message message) {
       return message.isBarrierMessage();
     }
   }


[flink-statefun] 01/17: [FLINK-17516] [e2e] Refactor StatefulFunctionsAppContainers to use Builder pattern

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 3f3da7aa3417c1c2ff26ac918c6896d319a0f6ec
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue May 12 22:57:03 2020 +0800

    [FLINK-17516] [e2e] Refactor StatefulFunctionsAppContainers to use Builder pattern
---
 .../e2e/common/StatefulFunctionsAppContainers.java | 328 +++++++++++----------
 .../e2e/routablekafka/RoutableKafkaE2E.java        |   5 +-
 .../statefun/e2e/sanity/SanityVerificationE2E.java |   5 +-
 3 files changed, 183 insertions(+), 155 deletions(-)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index f648fcd..9a3f2a3 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -55,7 +55,7 @@ import org.testcontainers.images.builder.ImageFromDockerfile;
  *
  *     {@code @Rule}
  *     public StatefulFunctionsAppContainers myApp =
- *         new StatefulFunctionsAppContainers("app-name", 3);
+ *         StatefulFunctionsAppContainers.builder("app-name", 3).build();
  *
  *     {@code @Test}
  *     public void runTest() {
@@ -77,15 +77,16 @@ import org.testcontainers.images.builder.ImageFromDockerfile;
  *
  *     {@code @Rule}
  *     public StatefulFunctionsAppContainers myApp =
- *         new StatefulFunctionsAppContainers("app-name", 3)
- *             .dependsOn(kafka);
+ *         StatefulFunctionsAppContainers.builder("app-name", 3)
+ *             .dependsOn(kafka)
+ *             .build();
  *
  *     ...
  * }
  * }</pre>
  *
  * <p>Application master and worker containers will always be started after containers that are
- * added using {@link #dependsOn(GenericContainer)} have started. Moreover, containers being
+ * added using {@link Builder#dependsOn(GenericContainer)} have started. Moreover, containers being
  * depended on will also be setup such that they share the same network with the master and workers,
  * so that they can freely communicate with each other.
  *
@@ -114,70 +115,28 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
 
   private static final Logger LOG = LoggerFactory.getLogger(StatefulFunctionsAppContainers.class);
 
-  private static final String MASTER_HOST = "statefun-app-master";
-  private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
-
-  private final String appName;
-  private final int numWorkers;
-  private final Network network;
-
-  private final Configuration dynamicProperties = new Configuration();
-  private final List<GenericContainer<?>> dependentContainers = new ArrayList<>();
-  private final List<ClasspathBuildContextFile> classpathBuildContextFiles = new ArrayList<>();
-  private Logger masterLogger;
-
   private GenericContainer<?> master;
   private List<GenericContainer<?>> workers;
 
-  public StatefulFunctionsAppContainers(String appName, int numWorkers) {
-    if (appName == null || appName.isEmpty()) {
-      throw new IllegalArgumentException(
-          "App name must be non-empty. This is used as the application image name.");
-    }
-    if (numWorkers < 1) {
-      throw new IllegalArgumentException("Must have at least 1 worker.");
-    }
-
-    this.network = Network.newNetwork();
-    this.appName = appName;
-    this.numWorkers = numWorkers;
-  }
-
-  public StatefulFunctionsAppContainers dependsOn(GenericContainer<?> container) {
-    container.withNetwork(network);
-    this.dependentContainers.add(container);
-    return this;
-  }
-
-  public StatefulFunctionsAppContainers exposeMasterLogs(Logger logger) {
-    this.masterLogger = logger;
-    return this;
+  private StatefulFunctionsAppContainers(
+      GenericContainer<?> masterContainer, List<GenericContainer<?>> workerContainers) {
+    this.master = Objects.requireNonNull(masterContainer);
+    this.workers = Objects.requireNonNull(workerContainers);
   }
 
-  public StatefulFunctionsAppContainers withModuleGlobalConfiguration(String key, String value) {
-    this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + key, value);
-    return this;
-  }
-
-  public <T> StatefulFunctionsAppContainers withConfiguration(ConfigOption<T> config, T value) {
-    this.dynamicProperties.set(config, value);
-    return this;
-  }
-
-  public StatefulFunctionsAppContainers withBuildContextFileFromClasspath(
-      String buildContextPath, String resourcePath) {
-    this.classpathBuildContextFiles.add(
-        new ClasspathBuildContextFile(buildContextPath, resourcePath));
-    return this;
+  /**
+   * Creates a builder for creating a {@link StatefulFunctionsAppContainers}.
+   *
+   * @param appName the name of the application.
+   * @param numWorkers the number of workers to run the application.
+   * @return a builder for creating a {@link StatefulFunctionsAppContainers}.
+   */
+  public static Builder builder(String appName, int numWorkers) {
+    return new Builder(appName, numWorkers);
   }
 
   @Override
   protected void before() throws Throwable {
-    final ImageFromDockerfile appImage =
-        appImage(appName, dynamicProperties, classpathBuildContextFiles);
-    this.master = masterContainer(appImage, network, dependentContainers, numWorkers, masterLogger);
-    this.workers = workerContainers(appImage, numWorkers, network);
-
     master.start();
     workers.forEach(GenericContainer::start);
   }
@@ -188,122 +147,189 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     workers.forEach(GenericContainer::stop);
   }
 
-  private static ImageFromDockerfile appImage(
-      String appName,
-      Configuration dynamicProperties,
-      List<ClasspathBuildContextFile> classpathBuildContextFiles) {
-    final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
-    LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
-
-    final ImageFromDockerfile appImage =
-        new ImageFromDockerfile(appName)
-            .withFileFromClasspath("Dockerfile", "Dockerfile")
-            .withFileFromPath(".", targetDirPath);
-
-    Configuration flinkConf = resolveFlinkConf(dynamicProperties);
-    String flinkConfString = flinkConfigAsString(flinkConf);
-    LOG.info(
-        "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
-        flinkConf);
-    appImage.withFileFromString("flink-conf.yaml", flinkConfString);
-
-    for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) {
-      appImage.withFileFromClasspath(
-          classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath);
+  public static final class Builder {
+    private static final String MASTER_HOST = "statefun-app-master";
+    private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
+
+    private final String appName;
+    private final int numWorkers;
+    private final Network network;
+
+    private final Configuration dynamicProperties = new Configuration();
+    private final List<GenericContainer<?>> dependentContainers = new ArrayList<>();
+    private final List<ClasspathBuildContextFile> classpathBuildContextFiles = new ArrayList<>();
+    private Logger masterLogger;
+
+    private Builder(String appName, int numWorkers) {
+      if (appName == null || appName.isEmpty()) {
+        throw new IllegalArgumentException(
+            "App name must be non-empty. This is used as the application image name.");
+      }
+      if (numWorkers < 1) {
+        throw new IllegalArgumentException("Must have at least 1 worker.");
+      }
+
+      this.network = Network.newNetwork();
+      this.appName = appName;
+      this.numWorkers = numWorkers;
     }
 
-    return appImage;
-  }
+    public StatefulFunctionsAppContainers.Builder dependsOn(GenericContainer<?> container) {
+      container.withNetwork(network);
+      this.dependentContainers.add(container);
+      return this;
+    }
 
-  /**
-   * Merges set dynamic properties with configuration in the base flink-conf.yaml located in
-   * resources.
-   */
-  private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
-    final InputStream baseFlinkConfResourceInputStream =
-        StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
-    if (baseFlinkConfResourceInputStream == null) {
-      throw new RuntimeException("Base flink-conf.yaml cannot be found.");
+    public StatefulFunctionsAppContainers.Builder exposeMasterLogs(Logger logger) {
+      this.masterLogger = logger;
+      return this;
     }
 
-    final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
-    return GlobalConfiguration.loadConfiguration(
-        tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
-  }
+    public StatefulFunctionsAppContainers.Builder withModuleGlobalConfiguration(
+        String key, String value) {
+      this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + key, value);
+      return this;
+    }
 
-  private static String flinkConfigAsString(Configuration configuration) {
-    StringBuilder yaml = new StringBuilder();
-    for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
-      yaml.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
+    public <T> StatefulFunctionsAppContainers.Builder withConfiguration(
+        ConfigOption<T> config, T value) {
+      this.dynamicProperties.set(config, value);
+      return this;
     }
 
-    return yaml.toString();
-  }
+    public StatefulFunctionsAppContainers.Builder withBuildContextFileFromClasspath(
+        String buildContextPath, String resourcePath) {
+      this.classpathBuildContextFiles.add(
+          new ClasspathBuildContextFile(buildContextPath, resourcePath));
+      return this;
+    }
 
-  private static File copyToTempFlinkConfFile(InputStream inputStream) {
-    try {
-      final File tempFile =
-          new File(
-              Files.createTempDirectory("statefun-app-containers").toString(), "flink-conf.yaml");
-      Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
-      return tempFile;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+    public StatefulFunctionsAppContainers build() {
+      final ImageFromDockerfile appImage =
+          appImage(appName, dynamicProperties, classpathBuildContextFiles);
+
+      return new StatefulFunctionsAppContainers(
+          masterContainer(appImage, network, dependentContainers, numWorkers, masterLogger),
+          workerContainers(appImage, numWorkers, network));
     }
-  }
 
-  private static GenericContainer<?> masterContainer(
-      ImageFromDockerfile appImage,
-      Network network,
-      List<GenericContainer<?>> dependents,
-      int numWorkers,
-      @Nullable Logger masterLogger) {
-    final GenericContainer<?> master =
-        new GenericContainer(appImage)
-            .withNetwork(network)
-            .withNetworkAliases(MASTER_HOST)
-            .withEnv("ROLE", "master")
-            .withEnv("MASTER_HOST", MASTER_HOST)
-            .withCommand("-p " + numWorkers);
-
-    for (GenericContainer<?> dependent : dependents) {
-      master.dependsOn(dependent);
+    private static ImageFromDockerfile appImage(
+        String appName,
+        Configuration dynamicProperties,
+        List<ClasspathBuildContextFile> classpathBuildContextFiles) {
+      final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
+      LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
+
+      final ImageFromDockerfile appImage =
+          new ImageFromDockerfile(appName)
+              .withFileFromClasspath("Dockerfile", "Dockerfile")
+              .withFileFromPath(".", targetDirPath);
+
+      Configuration flinkConf = resolveFlinkConf(dynamicProperties);
+      String flinkConfString = flinkConfigAsString(flinkConf);
+      LOG.info(
+          "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
+          flinkConf);
+      appImage.withFileFromString("flink-conf.yaml", flinkConfString);
+
+      for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) {
+        appImage.withFileFromClasspath(
+            classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath);
+      }
+
+      return appImage;
     }
 
-    if (masterLogger != null) {
-      master.withLogConsumer(new Slf4jLogConsumer(masterLogger, true));
+    /**
+     * Merges set dynamic properties with configuration in the base flink-conf.yaml located in
+     * resources.
+     */
+    private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
+      final InputStream baseFlinkConfResourceInputStream =
+          StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
+      if (baseFlinkConfResourceInputStream == null) {
+        throw new RuntimeException("Base flink-conf.yaml cannot be found.");
+      }
+
+      final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
+      return GlobalConfiguration.loadConfiguration(
+          tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
     }
 
-    return master;
-  }
+    private static String flinkConfigAsString(Configuration configuration) {
+      StringBuilder yaml = new StringBuilder();
+      for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
+        yaml.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
+      }
+
+      return yaml.toString();
+    }
 
-  private static List<GenericContainer<?>> workerContainers(
-      ImageFromDockerfile appImage, int numWorkers, Network network) {
-    final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers);
+    private static File copyToTempFlinkConfFile(InputStream inputStream) {
+      try {
+        final File tempFile =
+            new File(
+                Files.createTempDirectory("statefun-app-containers").toString(), "flink-conf.yaml");
+        Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        return tempFile;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
 
-    for (int i = 0; i < numWorkers; i++) {
-      workers.add(
+    private static GenericContainer<?> masterContainer(
+        ImageFromDockerfile appImage,
+        Network network,
+        List<GenericContainer<?>> dependents,
+        int numWorkers,
+        @Nullable Logger masterLogger) {
+      final GenericContainer<?> master =
           new GenericContainer(appImage)
               .withNetwork(network)
-              .withNetworkAliases(workerHostOf(i))
-              .withEnv("ROLE", "worker")
-              .withEnv("MASTER_HOST", MASTER_HOST));
+              .withNetworkAliases(MASTER_HOST)
+              .withEnv("ROLE", "master")
+              .withEnv("MASTER_HOST", MASTER_HOST)
+              .withCommand("-p " + numWorkers);
+
+      for (GenericContainer<?> dependent : dependents) {
+        master.dependsOn(dependent);
+      }
+
+      if (masterLogger != null) {
+        master.withLogConsumer(new Slf4jLogConsumer(masterLogger, true));
+      }
+
+      return master;
     }
 
-    return workers;
-  }
+    private static List<GenericContainer<?>> workerContainers(
+        ImageFromDockerfile appImage, int numWorkers, Network network) {
+      final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers);
 
-  private static String workerHostOf(int workerIndex) {
-    return WORKER_HOST_PREFIX + "-" + workerIndex;
-  }
+      for (int i = 0; i < numWorkers; i++) {
+        workers.add(
+            new GenericContainer(appImage)
+                .withNetwork(network)
+                .withNetworkAliases(workerHostOf(i))
+                .withEnv("ROLE", "worker")
+                .withEnv("MASTER_HOST", MASTER_HOST));
+      }
+
+      return workers;
+    }
+
+    private static String workerHostOf(int workerIndex) {
+      return WORKER_HOST_PREFIX + "-" + workerIndex;
+    }
 
-  private static class ClasspathBuildContextFile {
-    private final String buildContextPath;
-    private final String fromResourcePath;
+    private static class ClasspathBuildContextFile {
+      private final String buildContextPath;
+      private final String fromResourcePath;
 
-    ClasspathBuildContextFile(String buildContextPath, String fromResourcePath) {
-      this.buildContextPath = Objects.requireNonNull(buildContextPath);
-      this.fromResourcePath = Objects.requireNonNull(fromResourcePath);
+      ClasspathBuildContextFile(String buildContextPath, String fromResourcePath) {
+        this.buildContextPath = Objects.requireNonNull(buildContextPath);
+        this.fromResourcePath = Objects.requireNonNull(fromResourcePath);
+      }
     }
   }
 }
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
index 9f4eda1..3869163 100644
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
+++ b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
@@ -64,13 +64,14 @@ public class RoutableKafkaE2E {
 
   @Rule
   public StatefulFunctionsAppContainers verificationApp =
-      new StatefulFunctionsAppContainers("routable-kafka-verification", 1)
+      StatefulFunctionsAppContainers.builder("routable-kafka-verification", 1)
           .dependsOn(kafka)
           .exposeMasterLogs(LOG)
           .withBuildContextFileFromClasspath(
               "routable-kafka-ingress-module", "/routable-kafka-ingress-module/")
           .withModuleGlobalConfiguration(
-              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092");
+              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
+          .build();
 
   @Test(timeout = 60_000L)
   public void run() {
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
index 1bd08c7..473dc9b 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
@@ -63,11 +63,12 @@ public class SanityVerificationE2E {
 
   @Rule
   public StatefulFunctionsAppContainers verificationApp =
-      new StatefulFunctionsAppContainers("sanity-verification", 2)
+      StatefulFunctionsAppContainers.builder("sanity-verification", 2)
           .dependsOn(kafka)
           .exposeMasterLogs(LOG)
           .withModuleGlobalConfiguration(
-              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092");
+              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
+          .build();
 
   @Test(timeout = 60_000L)
   public void run() throws Exception {


[flink-statefun] 10/17: [FLINK-16928] Remove jobmanager.scheduler from all config files and docs

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit e86890de8221a1f2ef31a96d311be25981a4ed12
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri May 15 14:25:46 2020 +0800

    [FLINK-16928] Remove jobmanager.scheduler from all config files and docs
    
    This closes #112.
---
 docs/deployment-and-operations/packaging.md                              | 1 -
 .../statefun-e2e-tests-common/src/main/resources/flink-conf.yaml         | 1 -
 tools/docker/flink-distribution-template/conf/flink-conf.yaml            | 1 -
 tools/k8s/templates/config-map.yaml                                      | 1 -
 4 files changed, 4 deletions(-)

diff --git a/docs/deployment-and-operations/packaging.md b/docs/deployment-and-operations/packaging.md
index c90944b..d603920 100644
--- a/docs/deployment-and-operations/packaging.md
+++ b/docs/deployment-and-operations/packaging.md
@@ -73,7 +73,6 @@ The following configurations are strictly required for running StateFun applicat
 
 {% highlight yaml %}
 classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
-jobmanager.scheduler: legacy
 execution.checkpointing.max-concurrent-checkpoints: 1
 {% endhighlight %}
 
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
index d081c43..b022b0a 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
@@ -20,6 +20,5 @@ state.backend.rocksdb.timer-service.factory: ROCKSDB
 state.checkpoints.dir: file:///checkpoint-dir
 state.backend.incremental: true
 taskmanager.memory.process.size: 4g
-jobmanager.scheduler: legacy
 execution.checkpointing.interval: 5sec
 execution.checkpointing.mode: EXACTLY_ONCE
diff --git a/tools/docker/flink-distribution-template/conf/flink-conf.yaml b/tools/docker/flink-distribution-template/conf/flink-conf.yaml
index 574ccb0..d0d4522 100644
--- a/tools/docker/flink-distribution-template/conf/flink-conf.yaml
+++ b/tools/docker/flink-distribution-template/conf/flink-conf.yaml
@@ -20,7 +20,6 @@
 
 classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
 execution.checkpointing.max-concurrent-checkpoints: 1
-jobmanager.scheduler: legacy
 
 #==============================================================================
 # Recommended configurations. Users may change according to their needs.
diff --git a/tools/k8s/templates/config-map.yaml b/tools/k8s/templates/config-map.yaml
index d6bda3e..a57e7ff 100644
--- a/tools/k8s/templates/config-map.yaml
+++ b/tools/k8s/templates/config-map.yaml
@@ -34,7 +34,6 @@ data:
     execution.checkpointing.interval: {{ .Values.checkpoint.interval }}
     taskmanager.memory.process.size: {{ .Values.worker.jvm_mem }}
     parallelism.default: {{ .Values.worker.replicas }}
-    jobmanager.scheduler: legacy
 
   log4j-console.properties: |+
     log4j.rootLogger=INFO, console


[flink-statefun] 05/17: [FLINK-17516] [e2e] Allow restarting workers with StatefulFunctionsAppContainers

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 168e585f7b61ba98fdb08e86df3a2bb53fe8554f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 14 12:50:35 2020 +0800

    [FLINK-17516] [e2e] Allow restarting workers with StatefulFunctionsAppContainers
---
 .../e2e/common/StatefulFunctionsAppContainers.java       | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index 9894c2d..f03a5de 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -168,6 +168,22 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     return master.getMappedPort(8081);
   }
 
+  /**
+   * Restarts a single worker of this Stateful Functions application.
+   *
+   * @param workerIndex the index of the worker to restart.
+   */
+  public void restartWorker(int workerIndex) {
+    if (workerIndex >= workers.size()) {
+      throw new IndexOutOfBoundsException(
+          "Invalid worker index; valid values are 0 to " + (workers.size() - 1));
+    }
+
+    final GenericContainer<?> worker = workers.get(workerIndex);
+    worker.stop();
+    worker.start();
+  }
+
   private static File temporaryCheckpointDir() throws IOException {
     final Path currentWorkingDir = Paths.get(System.getProperty("user.dir"));
     return Files.createTempDirectory(currentWorkingDir, "statefun-app-checkpoints-").toFile();


[flink-statefun] 14/17: [FLINK-17533] Add support for multiple concurrent checkpoints

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit eac06fba7de3bfb7a9ca18c62e4b37adf635a837
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 22:32:10 2020 +0200

    [FLINK-17533] Add support for multiple concurrent checkpoints
---
 .../statefun/flink/core/feedback/Checkpoints.java  |  61 +++++++++
 .../flink/core/feedback/FeedbackUnionOperator.java |  34 ++---
 .../statefun/flink/core/logger/FeedbackLogger.java |  33 +++++
 .../flink/core/logger/UnboundedFeedbackLogger.java |   5 +-
 .../flink/core/feedback/CheckpointsTest.java       | 143 +++++++++++++++++++++
 5 files changed, 260 insertions(+), 16 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java
new file mode 100644
index 0000000..8fd0322
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.feedback;
+
+import java.io.OutputStream;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.flink.core.logger.FeedbackLogger;
+import org.apache.flink.util.IOUtils;
+
+final class Checkpoints<T> implements AutoCloseable {
+  private final Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory;
+  private final TreeMap<Long, FeedbackLogger<T>> uncompletedCheckpoints = new TreeMap<>();
+
+  Checkpoints(Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory) {
+    this.feedbackLoggerFactory = Objects.requireNonNull(feedbackLoggerFactory);
+  }
+
+  public void startLogging(long checkpointId, OutputStream outputStream) {
+    FeedbackLogger<T> logger = feedbackLoggerFactory.get();
+    logger.startLogging(outputStream);
+    uncompletedCheckpoints.put(checkpointId, logger);
+  }
+
+  public void append(T element) {
+    for (FeedbackLogger<T> logger : uncompletedCheckpoints.values()) {
+      logger.append(element);
+    }
+  }
+
+  public void commitCheckpointsUntil(long checkpointId) {
+    SortedMap<Long, FeedbackLogger<T>> completedCheckpoints =
+        uncompletedCheckpoints.headMap(checkpointId, true);
+    completedCheckpoints.values().forEach(FeedbackLogger::commit);
+    completedCheckpoints.clear();
+  }
+
+  @Override
+  public void close() {
+    IOUtils.closeAllQuietly(uncompletedCheckpoints.values());
+    uncompletedCheckpoints.clear();
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
index 402ded8..78fe10d 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.statefun.flink.core.feedback;
 
 import java.util.Objects;
+import java.util.OptionalLong;
 import java.util.concurrent.Executor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -26,9 +27,9 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
 import org.apache.flink.statefun.flink.core.common.SerializableFunction;
-import org.apache.flink.statefun.flink.core.common.SerializablePredicate;
 import org.apache.flink.statefun.flink.core.logger.Loggers;
 import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLogger;
+import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLoggerFactory;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.MailboxExecutor;
@@ -43,20 +44,20 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
 
   // -- configuration
   private final FeedbackKey<T> feedbackKey;
-  private final SerializablePredicate<T> isBarrierMessage;
+  private final SerializableFunction<T, OptionalLong> isBarrierMessage;
   private final SerializableFunction<T, ?> keySelector;
   private final long totalMemoryUsedForFeedbackCheckpointing;
   private final TypeSerializer<T> elementSerializer;
 
   // -- runtime
-  private transient UnboundedFeedbackLogger<T> feedbackLogger;
+  private transient Checkpoints<T> checkpoints;
   private transient boolean closedOrDisposed;
   private transient MailboxExecutor mailboxExecutor;
   private transient StreamRecord<T> reusable;
 
   FeedbackUnionOperator(
       FeedbackKey<T> feedbackKey,
-      SerializablePredicate<T> isBarrierMessage,
+      SerializableFunction<T, OptionalLong> isBarrierMessage,
       SerializableFunction<T, ?> keySelector,
       long totalMemoryUsedForFeedbackCheckpointing,
       TypeSerializer<T> elementSerializer,
@@ -84,11 +85,12 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
     if (closedOrDisposed) {
       return;
     }
-    if (isBarrierMessage.test(element)) {
-      feedbackLogger.commit();
+    OptionalLong maybeCheckpoint = isBarrierMessage.apply(element);
+    if (maybeCheckpoint.isPresent()) {
+      checkpoints.commitCheckpointsUntil(maybeCheckpoint.getAsLong());
     } else {
       sendDownstream(element);
-      feedbackLogger.append(element);
+      checkpoints.append(element);
     }
   }
 
@@ -105,22 +107,24 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
     // Initialize the unbounded feedback logger
     //
     @SuppressWarnings("unchecked")
-    UnboundedFeedbackLogger<T> feedbackLogger =
-        (UnboundedFeedbackLogger<T>)
-            Loggers.unboundedSpillableLogger(
+    UnboundedFeedbackLoggerFactory<T> feedbackLoggerFactory =
+        (UnboundedFeedbackLoggerFactory<T>)
+            Loggers.unboundedSpillableLoggerFactory(
                 ioManager,
                 maxParallelism,
                 totalMemoryUsedForFeedbackCheckpointing,
                 elementSerializer,
                 keySelector);
 
-    this.feedbackLogger = feedbackLogger;
+    this.checkpoints = new Checkpoints<>(feedbackLoggerFactory::create);
+
     //
     // we first must reply previously check-pointed envelopes before we start
     // processing any new envelopes.
     //
+    UnboundedFeedbackLogger<T> logger = feedbackLoggerFactory.create();
     for (KeyGroupStatePartitionStreamProvider keyedStateInput : context.getRawKeyedStateInputs()) {
-      this.feedbackLogger.replyLoggedEnvelops(keyedStateInput.getStream(), this);
+      logger.replyLoggedEnvelops(keyedStateInput.getStream(), this);
     }
     //
     // now we can start processing new messages. We do so by registering ourselves as a
@@ -132,7 +136,7 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
   @Override
   public void snapshotState(StateSnapshotContext context) throws Exception {
     super.snapshotState(context);
-    this.feedbackLogger.startLogging(context.getRawKeyedOperatorStateOutput());
+    checkpoints.startLogging(context.getCheckpointId(), context.getRawKeyedOperatorStateOutput());
   }
 
   @Override
@@ -152,8 +156,8 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
   // ------------------------------------------------------------------------------------------------------------------
 
   private void closeInternally() {
-    IOUtils.closeQuietly(feedbackLogger);
-    feedbackLogger = null;
+    IOUtils.closeQuietly(checkpoints);
+    checkpoints = null;
     closedOrDisposed = true;
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java
new file mode 100644
index 0000000..465a717
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.logger;
+
+import java.io.OutputStream;
+
+public interface FeedbackLogger<T> extends AutoCloseable {
+
+  /** Start logging messages into the supplied output stream. */
+  void startLogging(OutputStream keyedStateCheckpointOutputStream);
+
+  /** Append a message to the currently logging logger. */
+  void append(T message);
+
+  /** Commit the currently logging logger. */
+  void commit();
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
index ef0360a..409f714 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
@@ -38,7 +38,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
 import org.apache.flink.util.IOUtils;
 
-public final class UnboundedFeedbackLogger<T> implements Closeable {
+public final class UnboundedFeedbackLogger<T> implements FeedbackLogger<T> {
   private final Supplier<KeyGroupStream<T>> supplier;
   private final ToIntFunction<T> keyGroupAssigner;
   private final Map<Integer, KeyGroupStream<T>> keyGroupStreams;
@@ -60,6 +60,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
     this.checkpointedStreamOperations = Objects.requireNonNull(ops);
   }
 
+  @Override
   public void startLogging(OutputStream keyedStateCheckpointOutputStream) {
     this.checkpointedStreamOperations.requireKeyedStateCheckpointed(
         keyedStateCheckpointOutputStream);
@@ -68,6 +69,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
         checkpointedStreamOperations.acquireLease(keyedStateCheckpointOutputStream);
   }
 
+  @Override
   public void append(T message) {
     if (keyedStateOutputStream == null) {
       //
@@ -79,6 +81,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
     keyGroup.append(message);
   }
 
+  @Override
   public void commit() {
     try {
       flushToKeyedStateOutputStream();
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java
new file mode 100644
index 0000000..7b8dbee
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.feedback;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.flink.core.logger.FeedbackLogger;
+import org.apache.flink.util.Preconditions;
+import org.junit.Test;
+
+public class CheckpointsTest {
+
+  @Test
+  public void usageExample() {
+    Loggers loggers = new Loggers();
+
+    Checkpoints<String> checkpoints = new Checkpoints<>(loggers);
+    checkpoints.startLogging(1, new ByteArrayOutputStream());
+    checkpoints.append("hello");
+    checkpoints.append("world");
+    checkpoints.commitCheckpointsUntil(1);
+
+    assertThat(loggers.items(0), contains("hello", "world"));
+    assertThat(loggers.state(0), is(LoggerState.COMMITTED));
+  }
+
+  @Test
+  public void dataIsAppendedToMultipleLoggers() {
+    Loggers loggers = new Loggers();
+
+    Checkpoints<String> checkpoints = new Checkpoints<>(loggers);
+
+    checkpoints.startLogging(1, new ByteArrayOutputStream());
+    checkpoints.append("a");
+
+    checkpoints.startLogging(2, new ByteArrayOutputStream());
+    checkpoints.append("b");
+
+    checkpoints.commitCheckpointsUntil(1);
+    checkpoints.append("c");
+
+    checkpoints.commitCheckpointsUntil(2);
+
+    assertThat(loggers.items(0), contains("a", "b"));
+    assertThat(loggers.items(1), contains("b", "c"));
+  }
+
+  @Test
+  public void committingALaterCheckpointCommitsPreviousCheckpoints() {
+    Loggers loggers = new Loggers();
+
+    Checkpoints<String> checkpoints = new Checkpoints<>(loggers);
+
+    checkpoints.startLogging(1, new ByteArrayOutputStream());
+    checkpoints.startLogging(2, new ByteArrayOutputStream());
+    checkpoints.commitCheckpointsUntil(2);
+
+    assertThat(loggers.state(0), is(LoggerState.COMMITTED));
+    assertThat(loggers.state(1), is(LoggerState.COMMITTED));
+  }
+
+  private enum LoggerState {
+    IDLE,
+    LOGGING,
+    COMMITTED,
+    CLOSED
+  }
+
+  private static final class Loggers implements Supplier<FeedbackLogger<String>> {
+    private final List<FakeLogger> loggers = new ArrayList<>();
+
+    @Override
+    public FeedbackLogger<String> get() {
+      FakeLogger logger = new FakeLogger();
+      loggers.add(logger);
+      return logger;
+    }
+
+    List<String> items(int loggerIndex) {
+      Preconditions.checkElementIndex(loggerIndex, loggers.size());
+      FakeLogger logger = loggers.get(loggerIndex);
+      return logger.items;
+    }
+
+    LoggerState state(int loggerIndex) {
+      Preconditions.checkElementIndex(loggerIndex, loggers.size());
+      FakeLogger logger = loggers.get(loggerIndex);
+      return logger.state;
+    }
+  }
+
+  private static final class FakeLogger implements FeedbackLogger<String> {
+
+    List<String> items = new ArrayList<>();
+    LoggerState state = LoggerState.IDLE;
+
+    @Override
+    public void startLogging(OutputStream keyedStateCheckpointOutputStream) {
+      Preconditions.checkState(state == LoggerState.IDLE);
+      state = LoggerState.LOGGING;
+    }
+
+    @Override
+    public void append(String message) {
+      Preconditions.checkState(state != LoggerState.COMMITTED);
+      Preconditions.checkState(state != LoggerState.CLOSED);
+      items.add(message);
+    }
+
+    @Override
+    public void commit() {
+      Preconditions.checkState(state == LoggerState.LOGGING);
+      state = LoggerState.COMMITTED;
+    }
+
+    @Override
+    public void close() {
+      state = LoggerState.CLOSED;
+    }
+  }
+}


[flink-statefun] 16/17: [hotfix] Remove unused code

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit c97b706a83db7e0db278c4cb8c19751cd4d9345f
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 23:18:33 2020 +0200

    [hotfix] Remove unused code
---
 .../flink/core/common/SerializablePredicate.java   | 23 ----------------------
 .../flink/statefun/flink/core/logger/Loggers.java  | 14 -------------
 2 files changed, 37 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/SerializablePredicate.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/SerializablePredicate.java
deleted file mode 100644
index 05e6c48..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/SerializablePredicate.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.flink.core.common;
-
-import java.io.Serializable;
-import java.util.function.Predicate;
-
-public interface SerializablePredicate<T> extends Predicate<T>, Serializable {}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
index 948a808..aee7536 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
@@ -36,20 +36,6 @@ import org.apache.flink.util.ResourceGuard.Lease;
 public final class Loggers {
   private Loggers() {}
 
-  public static UnboundedFeedbackLogger<?> unboundedSpillableLogger(
-      IOManager ioManager,
-      int maxParallelism,
-      long inMemoryMaxBufferSize,
-      TypeSerializer<?> serializer,
-      Function<?, ?> keySelector) {
-
-    UnboundedFeedbackLoggerFactory<?> factory =
-        unboundedSpillableLoggerFactory(
-            ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector);
-
-    return factory.create();
-  }
-
   public static UnboundedFeedbackLoggerFactory<?> unboundedSpillableLoggerFactory(
       IOManager ioManager,
       int maxParallelism,