You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/14 05:20:11 UTC

[GitHub] [flink-statefun] tzulitai opened a new pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

tzulitai opened a new pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108


   This PR adds an E2E test that verifies exactly-once semantics with failure recovery.
   
   ===
   
   Please see the class level docs of {{ExactlyOnceVerificationModule}} and {{ExactlyOnceE2E}} on the specifics of the app used for verification, and the verification scenario.
   
   ===
   
   ## Change log
   
   - 2bca2bd is a refactoring of `StatefulFunctionsAppContainers`. While extending that class for extra functionality required by this new E2E, it was obvious that the class is growing to big and bundling too many responsibilities (test runtime functionality, and pre-test configuration). This commit refactors the class using the builder pattern.
   - 785a87f to 344a18 is enabling checkpointing in apps run by `StatefulFunctionsAppContainers`.
   - db797f4 Adds a utility method to restart specific workers at test runtime.
   - 84575f1 Adds the verification app
   - 24489af The actual E2E test verification scenario.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai commented on a change in pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108#discussion_r425509272



##########
File path: statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
##########
@@ -186,124 +148,220 @@ protected void before() throws Throwable {
   protected void after() {
     master.stop();
     workers.forEach(GenericContainer::stop);
-  }
 
-  private static ImageFromDockerfile appImage(
-      String appName,
-      Configuration dynamicProperties,
-      List<ClasspathBuildContextFile> classpathBuildContextFiles) {
-    final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
-    LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
-
-    final ImageFromDockerfile appImage =
-        new ImageFromDockerfile(appName)
-            .withFileFromClasspath("Dockerfile", "Dockerfile")
-            .withFileFromPath(".", targetDirPath);
-
-    Configuration flinkConf = resolveFlinkConf(dynamicProperties);
-    String flinkConfString = flinkConfigAsString(flinkConf);
-    LOG.info(
-        "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
-        flinkConf);
-    appImage.withFileFromString("flink-conf.yaml", flinkConfString);
-
-    for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) {
-      appImage.withFileFromClasspath(
-          classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath);
-    }
+    FileUtils.deleteDirectoryQuietly(checkpointDir);
+  }
 
-    return appImage;
+  /** @return the exposed port on master for calling REST APIs. */
+  public int getMasterRestPort() {
+    return master.getMappedPort(8081);
   }
 
   /**
-   * Merges set dynamic properties with configuration in the base flink-conf.yaml located in
-   * resources.
+   * Restarts a single worker of this Stateful Functions application.
+   *
+   * @param workerIndex the index of the worker to restart.
    */
-  private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
-    final InputStream baseFlinkConfResourceInputStream =
-        StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
-    if (baseFlinkConfResourceInputStream == null) {
-      throw new RuntimeException("Base flink-conf.yaml cannot be found.");
+  public void restartWorker(int workerIndex) {
+    if (workerIndex >= workers.size()) {
+      throw new IndexOutOfBoundsException(
+          "Invalid worker index; valid values are 0 to " + (workers.size() - 1));
     }
 
-    final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
-    return GlobalConfiguration.loadConfiguration(
-        tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
+    final GenericContainer<?> worker = workers.get(workerIndex);
+    worker.stop();
+    worker.start();
+  }
+
+  private static File temporaryCheckpointDir() throws IOException {

Review comment:
       I thought about that, but testcontainers does not support named volumes, only bind mounts. I guess it's because the docker-java API itself doesn't support named volumes, and therefore also not supported in Testcontainers which is built on top of that.
   
   However, even with named volumes, ideally we still would want to treat them as temporary resources that should be deleted after every test run - we don't want to be persisting anything beyond the test lifecycle.
   
   So, in that sense, technically there doesn't seem to be a difference in using named volumes v.s. bind mounting temp host directories for what we need to do here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai closed pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

Posted by GitBox <gi...@apache.org>.
tzulitai closed pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai edited a comment on pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

Posted by GitBox <gi...@apache.org>.
tzulitai edited a comment on pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108#issuecomment-628488917


   It seems like the tests are failing because the temporary checkpoint directories for E2E tests cannot be deleted. Mostly a permission issue in the Travis build environment.
   I'm currently on it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] igalshilman commented on a change in pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108#discussion_r425733775



##########
File path: statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
##########
@@ -186,124 +148,220 @@ protected void before() throws Throwable {
   protected void after() {
     master.stop();
     workers.forEach(GenericContainer::stop);
-  }
 
-  private static ImageFromDockerfile appImage(
-      String appName,
-      Configuration dynamicProperties,
-      List<ClasspathBuildContextFile> classpathBuildContextFiles) {
-    final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
-    LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
-
-    final ImageFromDockerfile appImage =
-        new ImageFromDockerfile(appName)
-            .withFileFromClasspath("Dockerfile", "Dockerfile")
-            .withFileFromPath(".", targetDirPath);
-
-    Configuration flinkConf = resolveFlinkConf(dynamicProperties);
-    String flinkConfString = flinkConfigAsString(flinkConf);
-    LOG.info(
-        "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
-        flinkConf);
-    appImage.withFileFromString("flink-conf.yaml", flinkConfString);
-
-    for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) {
-      appImage.withFileFromClasspath(
-          classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath);
-    }
+    FileUtils.deleteDirectoryQuietly(checkpointDir);
+  }
 
-    return appImage;
+  /** @return the exposed port on master for calling REST APIs. */
+  public int getMasterRestPort() {
+    return master.getMappedPort(8081);
   }
 
   /**
-   * Merges set dynamic properties with configuration in the base flink-conf.yaml located in
-   * resources.
+   * Restarts a single worker of this Stateful Functions application.
+   *
+   * @param workerIndex the index of the worker to restart.
    */
-  private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
-    final InputStream baseFlinkConfResourceInputStream =
-        StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
-    if (baseFlinkConfResourceInputStream == null) {
-      throw new RuntimeException("Base flink-conf.yaml cannot be found.");
+  public void restartWorker(int workerIndex) {
+    if (workerIndex >= workers.size()) {
+      throw new IndexOutOfBoundsException(
+          "Invalid worker index; valid values are 0 to " + (workers.size() - 1));
     }
 
-    final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
-    return GlobalConfiguration.loadConfiguration(
-        tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
+    final GenericContainer<?> worker = workers.get(workerIndex);
+    worker.stop();
+    worker.start();
+  }
+
+  private static File temporaryCheckpointDir() throws IOException {

Review comment:
       The difference is very subtle, where the named volume (on OSX, and probably on Windows) is created on the same filesystem as the containers (in the Linux VM that runs docker engine) and hence stuff like sharing a unix domain socket should work (AFIK) 
   Also It seems cleaner to let docker handle the whole thing and not touching the host file system directly.
   But I understand the limitation, and wouldn't want to overcomplicated this!
   so good to merge!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai commented on pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

Posted by GitBox <gi...@apache.org>.
tzulitai commented on pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108#issuecomment-628488917


   It seems like the tests are failing because the temporary checkpoint directories for E2E tests cannot be deleted. Mostly a permission issue in the Travis build environment.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai removed a comment on pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

Posted by GitBox <gi...@apache.org>.
tzulitai removed a comment on pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108#issuecomment-628488917


   It seems like the tests are failing because the temporary checkpoint directories for E2E tests cannot be deleted. Mostly a permission issue in the Travis build environment.
   I'm currently on it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai commented on pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

Posted by GitBox <gi...@apache.org>.
tzulitai commented on pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108#issuecomment-629018405


   @igalshilman I've addressed all comments except from the one about using Docker named volumes. Please let me know what you think, and if there are no further objections, I'll merge this PR. Thank you!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] igalshilman commented on a change in pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108#discussion_r425158368



##########
File path: statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/wrapped-messages-ingress-module/module.yaml
##########
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "1.0"
+
+module:
+  meta:
+    type: remote
+  spec:
+    ingresses:

Review comment:
       is there are reason to split the definitions of the ingress and the egress?
   why won't you have the egress here or the ingress also defined in the Java module?

##########
File path: statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
+import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
+import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+
+/**
+ * End-to-end test based on the {@link ExactlyOnceVerificationModule} application.
+ *
+ * <p>This test writes some {@link WrappedMessage} records to Kafka, which eventually gets routed to
+ * the counter function in the application. Then, after the corresponding {@link InvokeCount} are
+ * seen in the Kafka egress (which implies some checkpoints have been completed since the
+ * verification application is using exactly-once delivery), we restart a worker to simulate
+ * failure. The application should automatically attempt to recover and eventually restart.
+ * Meanwhile, more records are written to Kafka again. We verify that on the consumer side, the
+ * invocation counts increase sequentially for each key as if the failure did not occur.
+ */
+public class ExactlyOnceE2E {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceE2E.class);
+
+  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+  private static final String KAFKA_HOST = "kafka-broker";
+
+  private static final int NUM_WORKERS = 2;
+
+  /**
+   * Kafka broker. We need to explicitly set the transaction state log replication factor and min
+   * ISR since by default, those values are larger than 1 while we are only using 1 Kafka broker.
+   */
+  @Rule
+  public KafkaContainer kafka =
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
+          .withNetworkAliases(KAFKA_HOST)
+          .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
+          .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
+
+  @Rule
+  public StatefulFunctionsAppContainers verificationApp =
+      new StatefulFunctionsAppContainers.Builder("exactly-once-verification", NUM_WORKERS)
+          .dependsOn(kafka)
+          .exposeMasterLogs(LOG)
+          .withBuildContextFileFromClasspath(
+              "wrapped-messages-ingress-module", "/wrapped-messages-ingress-module/")
+          .withModuleGlobalConfiguration(
+              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
+          .build();
+
+  @Test(timeout = 300_000L)
+  public void run() throws Exception {
+    final String kafkaAddress = kafka.getBootstrapServers();
+
+    final Producer<String, WrappedMessage> messageProducer =
+        kafkaWrappedMessagesProducer(kafkaAddress);
+    final Consumer<String, InvokeCount> taggedMessageConsumer =
+        kafkaInvokeCountsConsumer(kafkaAddress);
+
+    final KafkaIOVerifier<String, WrappedMessage, String, InvokeCount> verifier =
+        new KafkaIOVerifier<>(messageProducer, taggedMessageConsumer);
+
+    assertThat(
+        verifier.sending(wrappedMessage("foo"), wrappedMessage("foo"), wrappedMessage("foo")),
+        verifier.resultsInOrder(
+            is(invokeCount("foo", 1)), is(invokeCount("foo", 2)), is(invokeCount("foo", 3))));
+
+    LOG.info(
+        "Restarting random worker to simulate failure. The application should automatically recover.");
+    verificationApp.restartWorker(randomWorkerIndex());
+
+    assertThat(
+        verifier.sending(wrappedMessage("foo"), wrappedMessage("foo"), wrappedMessage("foo")),
+        verifier.resultsInOrder(
+            is(invokeCount("foo", 4)), is(invokeCount("foo", 5)), is(invokeCount("foo", 6))));
+  }
+
+  private static Producer<String, WrappedMessage> kafkaWrappedMessagesProducer(
+      String bootstrapServers) {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", bootstrapServers);
+
+    return new KafkaProducer<>(
+        props, new StringSerializer(), new KafkaProtobufSerializer<>(WrappedMessage.parser()));
+  }
+
+  private Consumer<String, InvokeCount> kafkaInvokeCountsConsumer(String bootstrapServers) {
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
+    consumerProps.setProperty("group.id", "exactly-once-e2e");
+    consumerProps.setProperty("auto.offset.reset", "earliest");
+    consumerProps.setProperty("isolation.level", "read_committed");
+
+    KafkaConsumer<String, InvokeCount> consumer =
+        new KafkaConsumer<>(
+            consumerProps,
+            new StringDeserializer(),
+            new KafkaProtobufSerializer<>(InvokeCount.parser()));
+    consumer.subscribe(Collections.singletonList(KafkaIO.INVOKE_COUNTS_TOPIC_NAME));
+
+    return consumer;
+  }
+
+  private static ProducerRecord<String, WrappedMessage> wrappedMessage(String targetInvokeId) {
+    return new ProducerRecord<>(
+        KafkaIO.WRAPPED_MESSAGES_TOPIC_NAME,
+        UUID.randomUUID().toString(),

Review comment:
       👍 

##########
File path: statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
##########
@@ -55,7 +58,7 @@
  *
  *     {@code @Rule}
  *     public StatefulFunctionsAppContainers myApp =
- *         new StatefulFunctionsAppContainers("app-name", 3);
+ *         new StatefulFunctionsAppContainers.Builder("app-name", 3).build();

Review comment:
       nit: It could be nicer if you'd add a state `builder(..)` method to `StatefulFunctionsAppContainers`.

##########
File path: statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
##########
@@ -186,124 +148,220 @@ protected void before() throws Throwable {
   protected void after() {
     master.stop();
     workers.forEach(GenericContainer::stop);
-  }
 
-  private static ImageFromDockerfile appImage(
-      String appName,
-      Configuration dynamicProperties,
-      List<ClasspathBuildContextFile> classpathBuildContextFiles) {
-    final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
-    LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
-
-    final ImageFromDockerfile appImage =
-        new ImageFromDockerfile(appName)
-            .withFileFromClasspath("Dockerfile", "Dockerfile")
-            .withFileFromPath(".", targetDirPath);
-
-    Configuration flinkConf = resolveFlinkConf(dynamicProperties);
-    String flinkConfString = flinkConfigAsString(flinkConf);
-    LOG.info(
-        "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
-        flinkConf);
-    appImage.withFileFromString("flink-conf.yaml", flinkConfString);
-
-    for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) {
-      appImage.withFileFromClasspath(
-          classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath);
-    }
+    FileUtils.deleteDirectoryQuietly(checkpointDir);
+  }
 
-    return appImage;
+  /** @return the exposed port on master for calling REST APIs. */
+  public int getMasterRestPort() {
+    return master.getMappedPort(8081);
   }
 
   /**
-   * Merges set dynamic properties with configuration in the base flink-conf.yaml located in
-   * resources.
+   * Restarts a single worker of this Stateful Functions application.
+   *
+   * @param workerIndex the index of the worker to restart.
    */
-  private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
-    final InputStream baseFlinkConfResourceInputStream =
-        StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
-    if (baseFlinkConfResourceInputStream == null) {
-      throw new RuntimeException("Base flink-conf.yaml cannot be found.");
+  public void restartWorker(int workerIndex) {
+    if (workerIndex >= workers.size()) {
+      throw new IndexOutOfBoundsException(
+          "Invalid worker index; valid values are 0 to " + (workers.size() - 1));
     }
 
-    final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
-    return GlobalConfiguration.loadConfiguration(
-        tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
+    final GenericContainer<?> worker = workers.get(workerIndex);
+    worker.stop();
+    worker.start();
+  }
+
+  private static File temporaryCheckpointDir() throws IOException {

Review comment:
       I understand the need here, but I'm wondering if creating temp directories can be avoided here?
   For example can we create a separate named volume, does testcontainers supports that?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai commented on a change in pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108#discussion_r425512058



##########
File path: statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/wrapped-messages-ingress-module/module.yaml
##########
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "1.0"
+
+module:
+  meta:
+    type: remote
+  spec:
+    ingresses:

Review comment:
       One main reason for not using YAML-based egress is because it requires an extra dependency on `statefun-flink-io` for the `KafkaProducerRecord` class - the Java function needs to be sending records of type `KafkaProducerRecord` to the egress, as that's the type the YAML generic Kafka egress expects.
   
   In short, as of now, it's a bit awkward trying to send messages to YAML-based Kafka egress from Java functions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai commented on a change in pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108#discussion_r425509272



##########
File path: statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
##########
@@ -186,124 +148,220 @@ protected void before() throws Throwable {
   protected void after() {
     master.stop();
     workers.forEach(GenericContainer::stop);
-  }
 
-  private static ImageFromDockerfile appImage(
-      String appName,
-      Configuration dynamicProperties,
-      List<ClasspathBuildContextFile> classpathBuildContextFiles) {
-    final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
-    LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
-
-    final ImageFromDockerfile appImage =
-        new ImageFromDockerfile(appName)
-            .withFileFromClasspath("Dockerfile", "Dockerfile")
-            .withFileFromPath(".", targetDirPath);
-
-    Configuration flinkConf = resolveFlinkConf(dynamicProperties);
-    String flinkConfString = flinkConfigAsString(flinkConf);
-    LOG.info(
-        "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
-        flinkConf);
-    appImage.withFileFromString("flink-conf.yaml", flinkConfString);
-
-    for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) {
-      appImage.withFileFromClasspath(
-          classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath);
-    }
+    FileUtils.deleteDirectoryQuietly(checkpointDir);
+  }
 
-    return appImage;
+  /** @return the exposed port on master for calling REST APIs. */
+  public int getMasterRestPort() {
+    return master.getMappedPort(8081);
   }
 
   /**
-   * Merges set dynamic properties with configuration in the base flink-conf.yaml located in
-   * resources.
+   * Restarts a single worker of this Stateful Functions application.
+   *
+   * @param workerIndex the index of the worker to restart.
    */
-  private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
-    final InputStream baseFlinkConfResourceInputStream =
-        StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
-    if (baseFlinkConfResourceInputStream == null) {
-      throw new RuntimeException("Base flink-conf.yaml cannot be found.");
+  public void restartWorker(int workerIndex) {
+    if (workerIndex >= workers.size()) {
+      throw new IndexOutOfBoundsException(
+          "Invalid worker index; valid values are 0 to " + (workers.size() - 1));
     }
 
-    final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
-    return GlobalConfiguration.loadConfiguration(
-        tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
+    final GenericContainer<?> worker = workers.get(workerIndex);
+    worker.stop();
+    worker.start();
+  }
+
+  private static File temporaryCheckpointDir() throws IOException {

Review comment:
       I thought about that, but testcontainers does not support named volumes, only bind mounts. I guess it's because the docker-java API itself doesn't support named volumes, and therefore also not supported in Testcontainers which is built on top of that.
   
   However, even with named volumes, ideally we still would want to treat them as temporary resources that should be deleted after every test run - we don't want to be persisting anything beyond the test lifecycle.
   
   So, in that sense, there doesn't seem to be a difference in using named volumes v.s. bind mounting temp host directories for what we need to do here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai commented on a change in pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108#discussion_r425509272



##########
File path: statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
##########
@@ -186,124 +148,220 @@ protected void before() throws Throwable {
   protected void after() {
     master.stop();
     workers.forEach(GenericContainer::stop);
-  }
 
-  private static ImageFromDockerfile appImage(
-      String appName,
-      Configuration dynamicProperties,
-      List<ClasspathBuildContextFile> classpathBuildContextFiles) {
-    final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
-    LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
-
-    final ImageFromDockerfile appImage =
-        new ImageFromDockerfile(appName)
-            .withFileFromClasspath("Dockerfile", "Dockerfile")
-            .withFileFromPath(".", targetDirPath);
-
-    Configuration flinkConf = resolveFlinkConf(dynamicProperties);
-    String flinkConfString = flinkConfigAsString(flinkConf);
-    LOG.info(
-        "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
-        flinkConf);
-    appImage.withFileFromString("flink-conf.yaml", flinkConfString);
-
-    for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) {
-      appImage.withFileFromClasspath(
-          classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath);
-    }
+    FileUtils.deleteDirectoryQuietly(checkpointDir);
+  }
 
-    return appImage;
+  /** @return the exposed port on master for calling REST APIs. */
+  public int getMasterRestPort() {
+    return master.getMappedPort(8081);
   }
 
   /**
-   * Merges set dynamic properties with configuration in the base flink-conf.yaml located in
-   * resources.
+   * Restarts a single worker of this Stateful Functions application.
+   *
+   * @param workerIndex the index of the worker to restart.
    */
-  private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
-    final InputStream baseFlinkConfResourceInputStream =
-        StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
-    if (baseFlinkConfResourceInputStream == null) {
-      throw new RuntimeException("Base flink-conf.yaml cannot be found.");
+  public void restartWorker(int workerIndex) {
+    if (workerIndex >= workers.size()) {
+      throw new IndexOutOfBoundsException(
+          "Invalid worker index; valid values are 0 to " + (workers.size() - 1));
     }
 
-    final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
-    return GlobalConfiguration.loadConfiguration(
-        tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
+    final GenericContainer<?> worker = workers.get(workerIndex);
+    worker.stop();
+    worker.start();
+  }
+
+  private static File temporaryCheckpointDir() throws IOException {

Review comment:
       I thought about that, but testcontainers does not support named volumes, only bind mounts.
   
   However, even with named volumes, ideally we still would want to treat them as temporary resources that should be deleted after every test run - we don't want to be persisting anything beyond the test lifecycle.
   
   So, in that sense, there doesn't seem to be a difference in using named volumes v.s. bind mounting temp host directories for what we need to do here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai commented on a change in pull request #108: [FLINK-17516] [e2e] Exactly-once verification E2E

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #108:
URL: https://github.com/apache/flink-statefun/pull/108#discussion_r425513934



##########
File path: statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/wrapped-messages-ingress-module/module.yaml
##########
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "1.0"
+
+module:
+  meta:
+    type: remote
+  spec:
+    ingresses:

Review comment:
       On the other hand, there is no major reason to not use Java for the ingress.
   
   The only reason I used YAML-based Kafka ingress was because it was easier, without the need to write any serializer or router :)
   
   But for consistency, I will move the ingress to also be in the Java embedded module.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org