You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/20 07:53:49 UTC

[GitHub] [flink-statefun] tzulitai opened a new pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

tzulitai opened a new pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28
 
 
   This PR achieves the following goals:
   - Adds a `statefun-integration-tests` module that is meant to maintain integration tests in the future.
   - Adds `statefun-sanity-itcase` to the integration tests module, which includes a simple application used for sanity verification and a JUnit-driven test that performs the verification over a docker setup.
   - Adds a Maven build profile so that the integration tests can be run with a simple command: `mvn clean verify -Prun-integration-tests`
   
   ---
   
   ### The verification app and test
   
   The verification app (`SanityVerificationModule`) is a simple application used for sanity verification.
   - The application reads commands from a Kafka ingress
   - Has multiple functions binded (currently 2 in this PR) that reacts to the commands (see class-level Javadoc) of `SanityVerificationModule` for a full description on the set of commands)
   - Reflects any state updates in the functions back to a Kafka egress.
   
   The Junit-driven test does the following:
   - Uses Testcontainers (https://www.testcontainers.org/) to start a Kafka broker, builds the image for the verification app on the fly and also starts a master + worker container for the app.
   - Writes some commands to the Kafka ingress topic
   - Reads state outputs for the Kafka egress topic, and verifies that they are correct
   
   Right now the test scenario does not have any randomization to it.
   We may consider to add that in the future as a follow-up.
   
   ---
   
   ### Maven `run-integration-tests` build profile and structure setup
   
   With this PR, to add new container-based integration tests in the future, a developer has to do the following:
   - Add a new sub-module under `statefun-integration-tests`
   - Add source code for the Stateful Functions application to test against
   - Add a test class named with the pattern `*ITCase` that setups the containers using testcontainers, and implements the test verification logic as a JUnit test. The classname pattern is important because only then would it be skipped by the unit test phase, and only kicks in with the `run-integration-tests` profile.
   
   To build while also running the integration tests, one simply does:
   `mvn clean verify -Prun-integration-tests` in the project root directory.
   
   This always re-builds the base Stateful Functions image before running the ITCases.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#issuecomment-589727609
 
 
   @igalshilman FYI I'll address the rest of the comments next week.
   Thanks for the review so far, will ping you again once it is ready.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#discussion_r381997568
 
 

 ##########
 File path: statefun-integration-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
 ##########
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Noop;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Send;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * Sanity verification integration test based on the {@link SanityVerificationModule} application.
+ *
+ * <p>The integration test setups Kafka brokers and the verification application using Docker, sends
+ * a few commands to Kafka to be consumed by the application, and finally verifies that outputs sent
+ * to Kafka from the application are correct.
+ */
+public class SanityVerificationITCase {
+
+  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+
+  private static final ImageFromDockerfile verificationAppImage =
+      new ImageFromDockerfile("statefun-sanity-itcase")
+          .withFileFromClasspath("Dockerfile", "Dockerfile")
+          .withFileFromPath(".", Paths.get(System.getProperty("user.dir") + "/target/"));
+
+  @Rule public Network network = Network.newNetwork();
+
+  @Rule
+  public KafkaContainer kafka =
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
+          .withNetwork(network)
+          .withNetworkAliases("kafka-broker");
+
+  @Rule
+  public GenericContainer verificationAppMaster =
+      new GenericContainer(verificationAppImage)
+          .dependsOn(kafka)
+          .withNetwork(network)
+          .withNetworkAliases("master")
+          .withEnv("ROLE", "master")
+          .withEnv("MASTER_HOST", "master");
+
+  @Rule
+  public GenericContainer verificationAppWorker =
+      new GenericContainer(verificationAppImage)
+          .dependsOn(kafka, verificationAppMaster)
+          .withNetwork(network)
+          .withNetworkAliases("worker")
+          .withEnv("ROLE", "worker")
+          .withEnv("MASTER_HOST", "master");
+
+  @Test
+  public void run() throws Exception {
+    final String kafkaAddress = kafka.getBootstrapServers();
+    final ExecutorService kafkaIoExecutor = Executors.newCachedThreadPool();
+
+    kafkaIoExecutor.submit(new ProduceCommands(kafkaAddress));
+    Future<List<StateSnapshot>> stateSnapshotOutputs =
+        kafkaIoExecutor.submit(new ConsumeStateSnapshots(kafkaAddress));
+
+    assertThat(
+        stateSnapshotOutputs.get(1, TimeUnit.MINUTES),
+        hasItems(
 
 Review comment:
   Suggestion:
   It could be nicer if we had thecommands present at the same level that the verification is present.
   For example:
   
   ```
   assertThat(
          sending(
              modifiyAction(address1, 100),
              modifiyAction(address2, 200))
        , results(
                  stateSnapshot(address1, 100)
                  stateSnapshot(address2, 200))
           );
   ```
   
   It is fine the way it is right now, I'm sure will iterate on these kind of tests many times in the future

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#discussion_r381994213
 
 

 ##########
 File path: pom.xml
 ##########
 @@ -55,6 +55,7 @@ under the License.
         <module>statefun-examples</module>
         <module>statefun-flink</module>
         <module>statefun-quickstart</module>
+        <module>statefun-integration-tests</module>
 
 Review comment:
   It seems like and end to end test, don't you think?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#discussion_r382886088
 
 

 ##########
 File path: statefun-integration-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
 ##########
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Noop;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Send;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * Sanity verification integration test based on the {@link SanityVerificationModule} application.
+ *
+ * <p>The integration test setups Kafka brokers and the verification application using Docker, sends
+ * a few commands to Kafka to be consumed by the application, and finally verifies that outputs sent
+ * to Kafka from the application are correct.
+ */
+public class SanityVerificationITCase {
+
+  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+
+  private static final ImageFromDockerfile verificationAppImage =
+      new ImageFromDockerfile("statefun-sanity-itcase")
+          .withFileFromClasspath("Dockerfile", "Dockerfile")
+          .withFileFromPath(".", Paths.get(System.getProperty("user.dir") + "/target/"));
+
+  @Rule public Network network = Network.newNetwork();
+
+  @Rule
+  public KafkaContainer kafka =
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
+          .withNetwork(network)
+          .withNetworkAliases("kafka-broker");
+
+  @Rule
+  public GenericContainer verificationAppMaster =
+      new GenericContainer(verificationAppImage)
+          .dependsOn(kafka)
+          .withNetwork(network)
+          .withNetworkAliases("master")
+          .withEnv("ROLE", "master")
+          .withEnv("MASTER_HOST", "master");
+
+  @Rule
+  public GenericContainer verificationAppWorker =
+      new GenericContainer(verificationAppImage)
+          .dependsOn(kafka, verificationAppMaster)
+          .withNetwork(network)
+          .withNetworkAliases("worker")
+          .withEnv("ROLE", "worker")
+          .withEnv("MASTER_HOST", "master");
+
+  @Test
+  public void run() throws Exception {
+    final String kafkaAddress = kafka.getBootstrapServers();
+    final ExecutorService kafkaIoExecutor = Executors.newCachedThreadPool();
+
+    kafkaIoExecutor.submit(new ProduceCommands(kafkaAddress));
+    Future<List<StateSnapshot>> stateSnapshotOutputs =
+        kafkaIoExecutor.submit(new ConsumeStateSnapshots(kafkaAddress));
+
+    assertThat(
+        stateSnapshotOutputs.get(1, TimeUnit.MINUTES),
+        hasItems(
+            stateSnapshot(fnAddress(0, "id-1"), 100),
+            stateSnapshot(fnAddress(0, "id-2"), 300),
+            stateSnapshot(fnAddress(1, "id-3"), 200),
+            stateSnapshot(fnAddress(0, "id-2"), 350)));
+  }
+
+  // =================================================================================
+  //  Kafka IO utility classes and methods
+  // =================================================================================
+
+  private static class ProduceCommands implements Runnable {
+    private final String kafkaAddress;
+
+    ProduceCommands(String kafkaAddress) {
+      this.kafkaAddress = kafkaAddress;
+    }
+
+    @Override
+    public void run() {
+      Producer<FnAddress, Command> commandProducer = kafkaCommandProducer(kafkaAddress);
+      produceCommandToKafka(commandProducer, modifyAction(fnAddress(0, "id-1"), 100));
+      produceCommandToKafka(commandProducer, modifyAction(fnAddress(0, "id-2"), 300));
+      produceCommandToKafka(commandProducer, modifyAction(fnAddress(1, "id-3"), 200));
+      produceCommandToKafka(
+          commandProducer,
+          sendAction(fnAddress(1, "id-2"), modifyAction(fnAddress(0, "id-2"), 50)));
+      produceCommandToKafka(
+          commandProducer, sendAction(fnAddress(0, "id-1"), noOpAction(fnAddress(1, "id-1"))));
+      commandProducer.flush();
+    }
+  }
+
+  private static class ConsumeStateSnapshots implements Callable<List<StateSnapshot>> {
+
+    private final String kafkaAddress;
+
+    ConsumeStateSnapshots(String kafkaAddress) {
+      this.kafkaAddress = kafkaAddress;
+    }
+
+    @Override
+    public List<StateSnapshot> call() throws Exception {
+      Consumer<FnAddress, StateSnapshot> stateSnapshotConsumer =
+          kafkaStateSnapshotConsumer(kafkaAddress);
+
+      final int expectedOutputs = 4;
+      List<StateSnapshot> responses = new ArrayList<>(expectedOutputs);
+      while (responses.size() < expectedOutputs) {
+        ConsumerRecords<FnAddress, StateSnapshot> stateSnapshots =
+            stateSnapshotConsumer.poll(Duration.ofMillis(100));
+        for (ConsumerRecord<FnAddress, StateSnapshot> stateSnapshot : stateSnapshots) {
+          responses.add(stateSnapshot.value());
+        }
+      }
+
+      return responses;
+    }
+  }
+
+  private static Producer<FnAddress, Command> kafkaCommandProducer(String bootstrapServers) {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", bootstrapServers);
+
+    return new KafkaProducer<>(
+        props, new FnAddressSerializerDeserializer(), new CommandSerializer());
+  }
+
+  private static Consumer<FnAddress, StateSnapshot> kafkaStateSnapshotConsumer(
+      String bootstrapServers) {
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
+    consumerProps.setProperty("group.id", "sanity-itcase");
+    consumerProps.setProperty("auto.offset.reset", "earliest");
+
+    KafkaConsumer<FnAddress, StateSnapshot> consumer =
+        new KafkaConsumer<>(
+            consumerProps, new FnAddressSerializerDeserializer(), new StateSnapshotDeserializer());
+    consumer.subscribe(Collections.singletonList(KafkaIO.STATE_SNAPSHOTS_TOPIC_NAME));
+
+    return consumer;
+  }
+
+  private static void produceCommandToKafka(
+      Producer<FnAddress, Command> producer, Command command) {
+    producer.send(new ProducerRecord<>(KafkaIO.COMMAND_TOPIC_NAME, command.getTarget(), command));
+  }
+
+  // =================================================================================
+  //  Protobuf message building utilities
+  // =================================================================================
+
+  private static StateSnapshot stateSnapshot(FnAddress fromFnAddress, int stateSnapshotValue) {
+    return StateSnapshot.newBuilder().setFrom(fromFnAddress).setState(stateSnapshotValue).build();
+  }
+
+  private static Command sendAction(FnAddress targetAddress, Command commandToSend) {
+    final Send sendAction = Send.newBuilder().addCommandToSend(commandToSend).build();
+
+    return Command.newBuilder().setTarget(targetAddress).setSend(sendAction).build();
+  }
+
+  private static Command modifyAction(FnAddress targetAddress, int stateValueDelta) {
+    final Modify modifyAction = Modify.newBuilder().setDelta(stateValueDelta).build();
+
+    return Command.newBuilder().setTarget(targetAddress).setModify(modifyAction).build();
+  }
+
+  private static Command noOpAction(FnAddress targetAddress) {
+    return Command.newBuilder().setTarget(targetAddress).setNoop(Noop.getDefaultInstance()).build();
+  }
+
+  private static FnAddress fnAddress(int typeIndex, String fnId) {
+    if (typeIndex > Constants.FUNCTION_TYPES.length - 1) {
+      throw new IndexOutOfBoundsException(
+          "Type index is out of bounds. Max index: " + (Constants.FUNCTION_TYPES.length - 1));
+    }
+    return FnAddress.newBuilder().setType(typeIndex).setId(fnId).build();
+  }
+
+  // =================================================================================
+  //  Kafka ingress / egress serde
+  // =================================================================================
+
+  public static final class FnAddressSerializerDeserializer
 
 Review comment:
   Turns out there is a nice way to do that!
   See fdcdef1c1a4697bbecb0d9949d002dbe76551f44 for the fix.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai closed pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
tzulitai closed pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#discussion_r381981201
 
 

 ##########
 File path: statefun-integration-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
 ##########
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Noop;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Send;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * Sanity verification integration test based on the {@link SanityVerificationModule} application.
+ *
+ * <p>The integration test setups Kafka brokers and the verification application using Docker, sends
+ * a few commands to Kafka to be consumed by the application, and finally verifies that outputs sent
+ * to Kafka from the application are correct.
+ */
+public class SanityVerificationITCase {
+
+  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+
+  private static final ImageFromDockerfile verificationAppImage =
+      new ImageFromDockerfile("statefun-sanity-itcase")
+          .withFileFromClasspath("Dockerfile", "Dockerfile")
+          .withFileFromPath(".", Paths.get(System.getProperty("user.dir") + "/target/"));
+
+  @Rule public Network network = Network.newNetwork();
+
+  @Rule
+  public KafkaContainer kafka =
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
+          .withNetwork(network)
+          .withNetworkAliases("kafka-broker");
+
+  @Rule
+  public GenericContainer verificationAppMaster =
+      new GenericContainer(verificationAppImage)
+          .dependsOn(kafka)
+          .withNetwork(network)
+          .withNetworkAliases("master")
+          .withEnv("ROLE", "master")
+          .withEnv("MASTER_HOST", "master");
+
+  @Rule
+  public GenericContainer verificationAppWorker =
+      new GenericContainer(verificationAppImage)
+          .dependsOn(kafka, verificationAppMaster)
+          .withNetwork(network)
+          .withNetworkAliases("worker")
+          .withEnv("ROLE", "worker")
+          .withEnv("MASTER_HOST", "master");
+
+  @Test
+  public void run() throws Exception {
+    final String kafkaAddress = kafka.getBootstrapServers();
+    final ExecutorService kafkaIoExecutor = Executors.newCachedThreadPool();
+
+    kafkaIoExecutor.submit(new ProduceCommands(kafkaAddress));
 
 Review comment:
   You can use `CompletableFuture.supplyAsync(() -> ProduceCommands(..)));`
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#discussion_r381981556
 
 

 ##########
 File path: statefun-integration-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
 ##########
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Noop;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Send;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * Sanity verification integration test based on the {@link SanityVerificationModule} application.
+ *
+ * <p>The integration test setups Kafka brokers and the verification application using Docker, sends
+ * a few commands to Kafka to be consumed by the application, and finally verifies that outputs sent
+ * to Kafka from the application are correct.
+ */
+public class SanityVerificationITCase {
+
+  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+
+  private static final ImageFromDockerfile verificationAppImage =
+      new ImageFromDockerfile("statefun-sanity-itcase")
+          .withFileFromClasspath("Dockerfile", "Dockerfile")
+          .withFileFromPath(".", Paths.get(System.getProperty("user.dir") + "/target/"));
+
+  @Rule public Network network = Network.newNetwork();
+
+  @Rule
+  public KafkaContainer kafka =
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
+          .withNetwork(network)
+          .withNetworkAliases("kafka-broker");
+
+  @Rule
+  public GenericContainer verificationAppMaster =
+      new GenericContainer(verificationAppImage)
+          .dependsOn(kafka)
+          .withNetwork(network)
+          .withNetworkAliases("master")
+          .withEnv("ROLE", "master")
+          .withEnv("MASTER_HOST", "master");
+
+  @Rule
+  public GenericContainer verificationAppWorker =
+      new GenericContainer(verificationAppImage)
+          .dependsOn(kafka, verificationAppMaster)
+          .withNetwork(network)
+          .withNetworkAliases("worker")
+          .withEnv("ROLE", "worker")
+          .withEnv("MASTER_HOST", "master");
+
+  @Test
+  public void run() throws Exception {
+    final String kafkaAddress = kafka.getBootstrapServers();
+    final ExecutorService kafkaIoExecutor = Executors.newCachedThreadPool();
+
+    kafkaIoExecutor.submit(new ProduceCommands(kafkaAddress));
+    Future<List<StateSnapshot>> stateSnapshotOutputs =
+        kafkaIoExecutor.submit(new ConsumeStateSnapshots(kafkaAddress));
+
+    assertThat(
+        stateSnapshotOutputs.get(1, TimeUnit.MINUTES),
 
 Review comment:
   then, this can be `stateSnapshotOutputs.join()`
   and the test would use `@Test(timeout = ..` annotation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#discussion_r381978154
 
 

 ##########
 File path: statefun-integration-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
 ##########
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Noop;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Send;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * Sanity verification integration test based on the {@link SanityVerificationModule} application.
+ *
+ * <p>The integration test setups Kafka brokers and the verification application using Docker, sends
+ * a few commands to Kafka to be consumed by the application, and finally verifies that outputs sent
+ * to Kafka from the application are correct.
+ */
+public class SanityVerificationITCase {
+
+  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+
+  private static final ImageFromDockerfile verificationAppImage =
+      new ImageFromDockerfile("statefun-sanity-itcase")
+          .withFileFromClasspath("Dockerfile", "Dockerfile")
+          .withFileFromPath(".", Paths.get(System.getProperty("user.dir") + "/target/"));
 
 Review comment:
   Can you turn this to a static function call, with a comment and a log statement around the `withFileFromPath` ?
   A log message would reflects the actual `Path` value at runtime, since. `System.getProperty` is somewhat not deterministic (platform specific, etc')
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#discussion_r381984266
 
 

 ##########
 File path: statefun-integration-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
 ##########
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Noop;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Send;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * Sanity verification integration test based on the {@link SanityVerificationModule} application.
+ *
+ * <p>The integration test setups Kafka brokers and the verification application using Docker, sends
+ * a few commands to Kafka to be consumed by the application, and finally verifies that outputs sent
+ * to Kafka from the application are correct.
+ */
+public class SanityVerificationITCase {
+
+  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+
+  private static final ImageFromDockerfile verificationAppImage =
+      new ImageFromDockerfile("statefun-sanity-itcase")
+          .withFileFromClasspath("Dockerfile", "Dockerfile")
+          .withFileFromPath(".", Paths.get(System.getProperty("user.dir") + "/target/"));
+
+  @Rule public Network network = Network.newNetwork();
+
+  @Rule
+  public KafkaContainer kafka =
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
+          .withNetwork(network)
+          .withNetworkAliases("kafka-broker");
+
+  @Rule
+  public GenericContainer verificationAppMaster =
+      new GenericContainer(verificationAppImage)
+          .dependsOn(kafka)
+          .withNetwork(network)
+          .withNetworkAliases("master")
+          .withEnv("ROLE", "master")
+          .withEnv("MASTER_HOST", "master");
+
+  @Rule
+  public GenericContainer verificationAppWorker =
+      new GenericContainer(verificationAppImage)
+          .dependsOn(kafka, verificationAppMaster)
+          .withNetwork(network)
+          .withNetworkAliases("worker")
+          .withEnv("ROLE", "worker")
+          .withEnv("MASTER_HOST", "master");
+
+  @Test
+  public void run() throws Exception {
+    final String kafkaAddress = kafka.getBootstrapServers();
+    final ExecutorService kafkaIoExecutor = Executors.newCachedThreadPool();
+
+    kafkaIoExecutor.submit(new ProduceCommands(kafkaAddress));
+    Future<List<StateSnapshot>> stateSnapshotOutputs =
+        kafkaIoExecutor.submit(new ConsumeStateSnapshots(kafkaAddress));
+
+    assertThat(
+        stateSnapshotOutputs.get(1, TimeUnit.MINUTES),
+        hasItems(
+            stateSnapshot(fnAddress(0, "id-1"), 100),
+            stateSnapshot(fnAddress(0, "id-2"), 300),
+            stateSnapshot(fnAddress(1, "id-3"), 200),
+            stateSnapshot(fnAddress(0, "id-2"), 350)));
+  }
+
+  // =================================================================================
+  //  Kafka IO utility classes and methods
+  // =================================================================================
+
+  private static class ProduceCommands implements Runnable {
+    private final String kafkaAddress;
+
+    ProduceCommands(String kafkaAddress) {
+      this.kafkaAddress = kafkaAddress;
+    }
+
+    @Override
+    public void run() {
+      Producer<FnAddress, Command> commandProducer = kafkaCommandProducer(kafkaAddress);
+      produceCommandToKafka(commandProducer, modifyAction(fnAddress(0, "id-1"), 100));
+      produceCommandToKafka(commandProducer, modifyAction(fnAddress(0, "id-2"), 300));
+      produceCommandToKafka(commandProducer, modifyAction(fnAddress(1, "id-3"), 200));
+      produceCommandToKafka(
+          commandProducer,
+          sendAction(fnAddress(1, "id-2"), modifyAction(fnAddress(0, "id-2"), 50)));
+      produceCommandToKafka(
+          commandProducer, sendAction(fnAddress(0, "id-1"), noOpAction(fnAddress(1, "id-1"))));
+      commandProducer.flush();
+    }
+  }
+
+  private static class ConsumeStateSnapshots implements Callable<List<StateSnapshot>> {
+
+    private final String kafkaAddress;
+
+    ConsumeStateSnapshots(String kafkaAddress) {
+      this.kafkaAddress = kafkaAddress;
+    }
+
+    @Override
+    public List<StateSnapshot> call() throws Exception {
+      Consumer<FnAddress, StateSnapshot> stateSnapshotConsumer =
+          kafkaStateSnapshotConsumer(kafkaAddress);
+
+      final int expectedOutputs = 4;
+      List<StateSnapshot> responses = new ArrayList<>(expectedOutputs);
+      while (responses.size() < expectedOutputs) {
+        ConsumerRecords<FnAddress, StateSnapshot> stateSnapshots =
+            stateSnapshotConsumer.poll(Duration.ofMillis(100));
+        for (ConsumerRecord<FnAddress, StateSnapshot> stateSnapshot : stateSnapshots) {
+          responses.add(stateSnapshot.value());
+        }
+      }
+
+      return responses;
+    }
+  }
+
+  private static Producer<FnAddress, Command> kafkaCommandProducer(String bootstrapServers) {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", bootstrapServers);
+
+    return new KafkaProducer<>(
+        props, new FnAddressSerializerDeserializer(), new CommandSerializer());
+  }
+
+  private static Consumer<FnAddress, StateSnapshot> kafkaStateSnapshotConsumer(
+      String bootstrapServers) {
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
+    consumerProps.setProperty("group.id", "sanity-itcase");
+    consumerProps.setProperty("auto.offset.reset", "earliest");
+
+    KafkaConsumer<FnAddress, StateSnapshot> consumer =
+        new KafkaConsumer<>(
+            consumerProps, new FnAddressSerializerDeserializer(), new StateSnapshotDeserializer());
+    consumer.subscribe(Collections.singletonList(KafkaIO.STATE_SNAPSHOTS_TOPIC_NAME));
+
+    return consumer;
+  }
+
+  private static void produceCommandToKafka(
+      Producer<FnAddress, Command> producer, Command command) {
+    producer.send(new ProducerRecord<>(KafkaIO.COMMAND_TOPIC_NAME, command.getTarget(), command));
+  }
+
+  // =================================================================================
+  //  Protobuf message building utilities
+  // =================================================================================
+
+  private static StateSnapshot stateSnapshot(FnAddress fromFnAddress, int stateSnapshotValue) {
+    return StateSnapshot.newBuilder().setFrom(fromFnAddress).setState(stateSnapshotValue).build();
+  }
+
+  private static Command sendAction(FnAddress targetAddress, Command commandToSend) {
+    final Send sendAction = Send.newBuilder().addCommandToSend(commandToSend).build();
+
+    return Command.newBuilder().setTarget(targetAddress).setSend(sendAction).build();
+  }
+
+  private static Command modifyAction(FnAddress targetAddress, int stateValueDelta) {
+    final Modify modifyAction = Modify.newBuilder().setDelta(stateValueDelta).build();
+
+    return Command.newBuilder().setTarget(targetAddress).setModify(modifyAction).build();
+  }
+
+  private static Command noOpAction(FnAddress targetAddress) {
+    return Command.newBuilder().setTarget(targetAddress).setNoop(Noop.getDefaultInstance()).build();
+  }
+
+  private static FnAddress fnAddress(int typeIndex, String fnId) {
+    if (typeIndex > Constants.FUNCTION_TYPES.length - 1) {
+      throw new IndexOutOfBoundsException(
+          "Type index is out of bounds. Max index: " + (Constants.FUNCTION_TYPES.length - 1));
+    }
+    return FnAddress.newBuilder().setType(typeIndex).setId(fnId).build();
+  }
+
+  // =================================================================================
+  //  Kafka ingress / egress serde
+  // =================================================================================
+
+  public static final class FnAddressSerializerDeserializer
 
 Review comment:
   Is it possible to replace all of these with a single `ProtobufSerializer<T extends Message>`,
   not a big deal with it is not easy doable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#discussion_r381986157
 
 

 ##########
 File path: statefun-integration-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
 ##########
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Noop;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Send;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * Sanity verification integration test based on the {@link SanityVerificationModule} application.
+ *
+ * <p>The integration test setups Kafka brokers and the verification application using Docker, sends
+ * a few commands to Kafka to be consumed by the application, and finally verifies that outputs sent
+ * to Kafka from the application are correct.
+ */
+public class SanityVerificationITCase {
 
 Review comment:
   With what parallelism does this job starts?
   Do you think should the sanity job start with parallelism > 1 ?
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#discussion_r381978756
 
 

 ##########
 File path: statefun-integration-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
 ##########
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Noop;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Send;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * Sanity verification integration test based on the {@link SanityVerificationModule} application.
+ *
+ * <p>The integration test setups Kafka brokers and the verification application using Docker, sends
+ * a few commands to Kafka to be consumed by the application, and finally verifies that outputs sent
+ * to Kafka from the application are correct.
+ */
+public class SanityVerificationITCase {
+
+  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+
+  private static final ImageFromDockerfile verificationAppImage =
+      new ImageFromDockerfile("statefun-sanity-itcase")
+          .withFileFromClasspath("Dockerfile", "Dockerfile")
+          .withFileFromPath(".", Paths.get(System.getProperty("user.dir") + "/target/"));
+
+  @Rule public Network network = Network.newNetwork();
+
+  @Rule
+  public KafkaContainer kafka =
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
+          .withNetwork(network)
+          .withNetworkAliases("kafka-broker");
 
 Review comment:
   the network alias is tightly coupled to the Kafka address set in the `KafkaIngressSpec` and `KafkaEgressSpec` can they refer to the same constant?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#issuecomment-590221029
 
 
   @igalshilman all your comments have been addressed, and the PR is ready for review. Since the follow-up commit history got a bit out of control, I figured that it'll be easier if I rebased everything and squashed to understandable commits, and therefore had to force push.
   
   Besides some of the more minor comments:
   - I've introduced our own `StatefulFunctionsAppContainers` test rule that is based on Testcontainers. This would remove a lot of the cumbersome setup code from the actual test code, and makes it easier to setup multiple workers in the end-to-end tests. See 
   86ccc87. I can see this as a common utility to be used by other end-to-end tests in the future.
   
   - The test now runs in parallelism 2. This is done by adding a `flink-conf.yaml` with `parallelism.default=2` to the built application image, and having 2 workers in the `StatefulFunctionsAppContainers` test rule.
   
   - Concerning a more readable assertion of Kafka inputs and outputs (your comment [here](https://github.com/apache/flink-statefun/pull/28#discussion_r381997568)), I gave that a try and came up with a `KafkaIOVerifier` utility. See bb2e0cc. It's not 100% good though; needs a good way to verify end-of-output situations (please see the `TODO` in `KafkaIOVerifier`).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#issuecomment-590243533
 
 
   Thanks a lot, merging ...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on issue #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

Posted by GitBox <gi...@apache.org>.
igalshilman commented on issue #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28#issuecomment-590240243
 
 
   Thanks for the update @tzulitai, this looks really good!
   Let's merge that and slowly build confidence by adding e2e test cases to cover existing functionality.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services