You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/12/03 08:24:26 UTC
[flink-statefun] 01/03: [FLINK-20303] [e2e] Add a SmokeE2E test
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit b86616bae4ce58dbdeebb263810de9a9c85fff61
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Fri Nov 20 21:20:36 2020 +0100
[FLINK-20303] [e2e] Add a SmokeE2E test
This closes #178.
---
statefun-e2e-tests/pom.xml | 1 +
statefun-e2e-tests/statefun-smoke-e2e/pom.xml | 143 +++++++++++++
.../flink/statefun/e2e/smoke/AsyncCompleter.java | 105 ++++++++++
.../statefun/e2e/smoke/CommandFlinkSource.java | 233 +++++++++++++++++++++
.../flink/statefun/e2e/smoke/CommandGenerator.java | 162 ++++++++++++++
.../statefun/e2e/smoke/CommandInterpreter.java | 144 +++++++++++++
.../flink/statefun/e2e/smoke/CommandRouter.java | 40 ++++
.../apache/flink/statefun/e2e/smoke/Constants.java | 35 ++++
.../org/apache/flink/statefun/e2e/smoke/Fn.java | 39 ++++
.../flink/statefun/e2e/smoke/FunctionProvider.java | 37 ++++
.../statefun/e2e/smoke/FunctionStateTracker.java | 78 +++++++
.../org/apache/flink/statefun/e2e/smoke/Ids.java | 38 ++++
.../apache/flink/statefun/e2e/smoke/Module.java | 78 +++++++
.../flink/statefun/e2e/smoke/ModuleParameters.java | 193 +++++++++++++++++
.../flink/statefun/e2e/smoke/ProtobufUtils.java | 34 +++
.../src/main/protobuf/commands.proto | 71 +++++++
.../src/main/protobuf/internal.proto | 35 ++++
.../statefun/e2e/smoke/CommandGeneratorTest.java | 40 ++++
.../statefun/e2e/smoke/CommandInterpreterTest.java | 73 +++++++
.../e2e/smoke/FunctionStateTrackerTest.java | 52 +++++
.../flink/statefun/e2e/smoke/HarnessTest.java | 90 ++++++++
.../statefun/e2e/smoke/ModuleParametersTest.java | 47 +++++
.../statefun/e2e/smoke/SimpleProtobufServer.java | 142 +++++++++++++
.../flink/statefun/e2e/smoke/SmokeRunner.java | 73 +++++++
.../statefun/e2e/smoke/SmokeVerificationE2E.java | 34 +++
.../org/apache/flink/statefun/e2e/smoke/Utils.java | 87 ++++++++
.../src/test/resources/Dockerfile | 20 ++
.../src/test/resources/log4j.properties | 24 +++
tools/maven/spotbugs-exclude.xml | 3 +
29 files changed, 2151 insertions(+)
diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index b754f0e..e38a954 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -32,6 +32,7 @@ under the License.
<module>statefun-e2e-tests-common</module>
<module>statefun-sanity-e2e</module>
<module>statefun-exactly-once-remote-e2e</module>
+ <module>statefun-smoke-e2e</module>
</modules>
<build>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e/pom.xml
new file mode 100644
index 0000000..71bb3c3
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>statefun-e2e-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>2.3-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>statefun-smoke-e2e</artifactId>
+
+ <properties>
+ <testcontainers.version>1.12.5</testcontainers.version>
+ <commons-math3.version>3.5</commons-math3.version>
+ </properties>
+
+ <dependencies>
+ <!-- Stateful Functions -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-flink-io</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-flink-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- smoke logic -->
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ <version>${commons-math3.version}</version>
+ </dependency>
+
+ <!-- Protobuf -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+
+ <!-- streaming runtime -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <!--
+ This artifact transitively depends on different versions of slf4j-api.
+ To see the complete list, comment this exclusion run mvn enforcer:enforce.
+ -->
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <!--
+ This artifact transitively depends on different versions of slf4j-api.
+ To see the complete list, comment this exclusion run mvn enforcer:enforce.
+ -->
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.15</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ </dependency>
+
+ <!-- End-to-end test common -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-e2e-tests-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <!-- conflicts with flink-core -->
+ <exclusion>
+ <groupId>com.kohlschutter.junixsocket</groupId>
+ <artifactId>junixsocket-native-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-flink-harness</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.github.os72</groupId>
+ <artifactId>protoc-jar-maven-plugin</artifactId>
+ <version>${protoc-jar-maven-plugin.version}</version>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/AsyncCompleter.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/AsyncCompleter.java
new file mode 100644
index 0000000..e16bdc8
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/AsyncCompleter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * Creates {@link CompletableFuture}s that can be completed successfully or unsuccessfully, within 1
+ * millisecond delay.
+ */
+final class AsyncCompleter {
+
+ private static final Throwable EXCEPTION;
+
+ static {
+ Throwable t = new RuntimeException();
+ t.setStackTrace(new StackTraceElement[0]);
+ EXCEPTION = t;
+ }
+
+ private static final class Task {
+ final long time;
+ final CompletableFuture<Boolean> future;
+ final boolean success;
+
+ public Task(boolean success) {
+ this.time = System.nanoTime();
+ this.future = new CompletableFuture<>();
+ this.success = success;
+ }
+ }
+
+ private static final int ONE_MILLISECOND = Duration.ofMillis(1).getNano();
+ private final LinkedBlockingDeque<Task> queue = new LinkedBlockingDeque<>();
+ private boolean started;
+
+ /**
+ * Returns a future that would be complete successfully, no sooner than 1 millisecond from now.
+ */
+ CompletableFuture<Boolean> successfulFuture() {
+ return future(true);
+ }
+
+ /**
+ * Returns a future that would be completed unsuccessfully, no sooner than 1 millisecond from now.
+ */
+ CompletableFuture<Boolean> failedFuture() {
+ return future(false);
+ }
+
+ private CompletableFuture<Boolean> future(boolean success) {
+ Task e = new Task(success);
+ queue.add(e);
+ return e.future;
+ }
+
+ void start() {
+ if (started) {
+ return;
+ }
+ started = true;
+ Thread t = new Thread(this::run);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ @SuppressWarnings({"InfiniteLoopStatement", "BusyWait"})
+ void run() {
+ while (true) {
+ try {
+ Task e = queue.take();
+ final long duration = System.nanoTime() - e.time;
+ if (duration < ONE_MILLISECOND) {
+ Thread.sleep(1);
+ }
+ CompletableFuture<Boolean> future = e.future;
+ if (e.success) {
+ future.complete(Boolean.TRUE);
+ } else {
+ future.completeExceptionally(EXCEPTION);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandFlinkSource.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandFlinkSource.java
new file mode 100644
index 0000000..ea4ed39
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandFlinkSource.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.generated.Command.Verify;
+import static org.apache.flink.statefun.e2e.smoke.generated.Command.newBuilder;
+
+import com.google.protobuf.Any;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.statefun.e2e.smoke.generated.Command;
+import org.apache.flink.statefun.e2e.smoke.generated.Commands;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceSnapshot;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink Source that Emits {@link SourceCommand}s.
+ *
+ * <p>This source is configured by {@link ModuleParameters} and would generate random commands,
+ * addressed to various functions. This source might also throw exceptions (kaboom) to simulate
+ * failures.
+ *
+ * <p>After generating {@link ModuleParameters#getMessageCount()} messages, this source will switch
+ * to {@code verification} step. At this step, it would keep sending (every 2 seconds) a {@link
+ * Verify} command to every function indefinitely.
+ */
+final class CommandFlinkSource extends RichSourceFunction<Any>
+ implements CheckpointedFunction, CheckpointListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CommandFlinkSource.class);
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Configuration
+ // ------------------------------------------------------------------------------------------------------------
+
+ private final ModuleParameters moduleParameters;
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Runtime
+ // ------------------------------------------------------------------------------------------------------------
+
+ private transient ListState<SourceSnapshot> sourceSnapshotHandle;
+ private transient FunctionStateTracker functionStateTracker;
+ private transient int commandsSentSoFar;
+ private transient int failuresSoFar;
+ private transient boolean done;
+ private transient boolean atLeastOneCheckpointCompleted;
+
+ public CommandFlinkSource(ModuleParameters moduleParameters) {
+ this.moduleParameters = Objects.requireNonNull(moduleParameters);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ OperatorStateStore store = context.getOperatorStateStore();
+ sourceSnapshotHandle =
+ store.getUnionListState(new ListStateDescriptor<>("snapshot", SourceSnapshot.class));
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ SourceSnapshot sourceSnapshot =
+ getOnlyElement(sourceSnapshotHandle.get(), SourceSnapshot.getDefaultInstance());
+ functionStateTracker =
+ new FunctionStateTracker(moduleParameters.getNumberOfFunctionInstances())
+ .apply(sourceSnapshot.getTracker());
+ commandsSentSoFar = sourceSnapshot.getCommandsSentSoFarHandle();
+ failuresSoFar = sourceSnapshot.getFailuresGeneratedSoFar();
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ sourceSnapshotHandle.clear();
+ sourceSnapshotHandle.add(
+ SourceSnapshot.newBuilder()
+ .setCommandsSentSoFarHandle(commandsSentSoFar)
+ .setTracker(functionStateTracker.snapshot())
+ .setFailuresGeneratedSoFar(failuresSoFar)
+ .build());
+
+ if (commandsSentSoFar < moduleParameters.getMessageCount()) {
+ double perCent = 100.0d * (commandsSentSoFar) / moduleParameters.getMessageCount();
+ LOG.info(
+ "Commands sent {} / {} ({} %)",
+ commandsSentSoFar, moduleParameters.getMessageCount(), perCent);
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ atLeastOneCheckpointCompleted = true;
+ }
+
+ @Override
+ public void cancel() {
+ done = true;
+ }
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Generation
+ // ------------------------------------------------------------------------------------------------------------
+
+ @Override
+ public void run(SourceContext<Any> ctx) {
+ generate(ctx);
+ do {
+ verify(ctx);
+ snooze();
+ synchronized (ctx.getCheckpointLock()) {
+ if (done) {
+ return;
+ }
+ }
+ } while (true);
+ }
+
+ private void generate(SourceContext<Any> ctx) {
+ final int startPosition = this.commandsSentSoFar;
+ final OptionalInt kaboomIndex =
+ computeFailureIndex(startPosition, failuresSoFar, moduleParameters.getMaxFailures());
+ if (kaboomIndex.isPresent()) {
+ failuresSoFar++;
+ }
+ LOG.info(
+ "starting at {}, kaboom at {}, total messages {}",
+ startPosition,
+ kaboomIndex,
+ moduleParameters.getMessageCount());
+ Supplier<SourceCommand> generator =
+ new CommandGenerator(new JDKRandomGenerator(), moduleParameters);
+ FunctionStateTracker functionStateTracker = this.functionStateTracker;
+ for (int i = startPosition; i < moduleParameters.getMessageCount(); i++) {
+ if (atLeastOneCheckpointCompleted && kaboomIndex.isPresent() && i >= kaboomIndex.getAsInt()) {
+ throw new RuntimeException("KABOOM!!!");
+ }
+ SourceCommand command = generator.get();
+ synchronized (ctx.getCheckpointLock()) {
+ if (done) {
+ return;
+ }
+ functionStateTracker.apply(command);
+ ctx.collect(Any.pack(command));
+ this.commandsSentSoFar = i;
+ }
+ }
+ }
+
+ private void verify(SourceContext<Any> ctx) {
+ FunctionStateTracker functionStateTracker = this.functionStateTracker;
+
+ for (int i = 0; i < moduleParameters.getNumberOfFunctionInstances(); i++) {
+ final long expected = functionStateTracker.stateOf(i);
+
+ Command.Builder verify = newBuilder().setVerify(Verify.newBuilder().setExpected(expected));
+
+ SourceCommand command =
+ SourceCommand.newBuilder()
+ .setTarget(i)
+ .setCommands(Commands.newBuilder().addCommand(verify))
+ .build();
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(Any.pack(command));
+ }
+ }
+ }
+
+ // ---------------------------------------------------------------------------------------------------------------
+ // Utils
+ // ---------------------------------------------------------------------------------------------------------------
+
+ private OptionalInt computeFailureIndex(int startPosition, int failureSoFar, int maxFailures) {
+ if (failureSoFar >= maxFailures) {
+ return OptionalInt.empty();
+ }
+ if (startPosition >= moduleParameters.getMessageCount()) {
+ return OptionalInt.empty();
+ }
+ int index =
+ ThreadLocalRandom.current().nextInt(startPosition, moduleParameters.getMessageCount());
+ return OptionalInt.of(index);
+ }
+
+ private static void snooze() {
+ try {
+ Thread.sleep(2_000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static <T> T getOnlyElement(Iterable<T> items, T def) {
+ Iterator<T> it = items.iterator();
+ if (!it.hasNext()) {
+ return def;
+ }
+ T item = it.next();
+ if (it.hasNext()) {
+ throw new IllegalStateException("Iterable has additional elements");
+ }
+ return item;
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandGenerator.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandGenerator.java
new file mode 100644
index 0000000..a062de2
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandGenerator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import static java.util.Arrays.asList;
+import static org.apache.commons.math3.util.Pair.create;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Supplier;
+import org.apache.commons.math3.distribution.EnumeratedDistribution;
+import org.apache.commons.math3.random.RandomGenerator;
+import org.apache.commons.math3.util.Pair;
+import org.apache.flink.statefun.e2e.smoke.generated.Command;
+import org.apache.flink.statefun.e2e.smoke.generated.Commands;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+
+/**
+ * Generates random commands to be interpreted by {@linkplain CommandInterpreter}.
+ *
+ * <p>see {src/main/protobuf/commands.proto}
+ */
+public final class CommandGenerator implements Supplier<SourceCommand> {
+
+ private final RandomGenerator random;
+ private final EnumeratedDistribution<Gen> distribution;
+ private final ModuleParameters moduleParameters;
+
+ public CommandGenerator(RandomGenerator random, ModuleParameters parameters) {
+ this.random = Objects.requireNonNull(random);
+ this.moduleParameters = Objects.requireNonNull(parameters);
+ this.distribution = new EnumeratedDistribution<>(random, randomCommandGenerators());
+ }
+
+ @Override
+ public SourceCommand get() {
+ final int depth = random.nextInt(moduleParameters.getCommandDepth());
+ return SourceCommand.newBuilder().setTarget(address()).setCommands(commands(depth)).build();
+ }
+
+ private Commands.Builder commands(int depth) {
+ Commands.Builder builder = Commands.newBuilder();
+ if (depth <= 0) {
+ StateModifyGen.instance().generate(builder, depth);
+ return builder;
+ }
+ final int n = random.nextInt(moduleParameters.getMaxCommandsPerDepth());
+ for (int i = 0; i < n; i++) {
+ Gen gen = distribution.sample();
+ gen.generate(builder, depth);
+ }
+ if (builder.getCommandCount() == 0) {
+ StateModifyGen.instance().generate(builder, depth);
+ }
+ return builder;
+ }
+
+ private int address() {
+ return random.nextInt(moduleParameters.getNumberOfFunctionInstances());
+ }
+
+ private List<Pair<Gen, Double>> randomCommandGenerators() {
+ return asList(
+ create(new StateModifyGen(), moduleParameters.getStateModificationsPr()),
+ create(new SendGen(), moduleParameters.getSendPr()),
+ create(new SendAfterGen(), moduleParameters.getSendAfterPr()),
+ create(new SendAsyncOp(), moduleParameters.getAsyncSendPr()),
+ create(new Noop(), moduleParameters.getNoopPr()),
+ create(new SendEgress(), moduleParameters.getSendEgressPr()));
+ }
+
+ interface Gen {
+ /** generates one or more commands with depth at most @depth. */
+ void generate(Commands.Builder builder, int depth);
+ }
+
+ // ----------------------------------------------------------------------------------------------------
+ // generators
+ // ----------------------------------------------------------------------------------------------------
+
+ private static final class SendEgress implements Gen {
+
+ @Override
+ public void generate(Commands.Builder builder, int depth) {
+ builder.addCommand(
+ Command.newBuilder().setSendEgress(Command.SendEgress.getDefaultInstance()));
+ }
+ }
+
+ private static final class Noop implements Gen {
+ @Override
+ public void generate(Commands.Builder builder, int depth) {}
+ }
+
+ private static final class StateModifyGen implements Gen {
+
+ static final Gen INSTANCE = new StateModifyGen();
+
+ static Gen instance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void generate(Commands.Builder builder, int depth) {
+ builder.addCommand(
+ Command.newBuilder().setIncrement(Command.IncrementState.getDefaultInstance()));
+ }
+ }
+
+ private final class SendAfterGen implements Gen {
+
+ @Override
+ public void generate(Commands.Builder builder, int depth) {
+ builder.addCommand(Command.newBuilder().setSendAfter(sendAfter(depth)));
+ }
+
+ private Command.SendAfter.Builder sendAfter(int depth) {
+ return Command.SendAfter.newBuilder().setTarget(address()).setCommands(commands(depth - 1));
+ }
+ }
+
+ private final class SendGen implements Gen {
+
+ @Override
+ public void generate(Commands.Builder builder, int depth) {
+ builder.addCommand(Command.newBuilder().setSend(send(depth)));
+ }
+
+ private Command.Send.Builder send(int depth) {
+ return Command.Send.newBuilder().setTarget(address()).setCommands(commands(depth - 1));
+ }
+ }
+
+ private final class SendAsyncOp implements Gen {
+
+ @Override
+ public void generate(Commands.Builder builder, int depth) {
+ builder.addCommand(Command.newBuilder().setAsyncOperation(asyncOp(depth)));
+ }
+
+ private Command.AsyncOperation.Builder asyncOp(int depth) {
+ return Command.AsyncOperation.newBuilder()
+ .setFailure(random.nextBoolean())
+ .setResolvedCommands(commands(depth - 1));
+ }
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreter.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreter.java
new file mode 100644
index 0000000..343c8f2
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.ProtobufUtils.unpack;
+
+import com.google.protobuf.Any;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.e2e.smoke.generated.Command;
+import org.apache.flink.statefun.e2e.smoke.generated.Commands;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
+import org.apache.flink.statefun.sdk.AsyncOperationResult;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+public final class CommandInterpreter {
+ private final AsyncCompleter asyncCompleter;
+ private final Ids ids;
+ private static final Duration sendAfterDelay = Duration.ofMillis(1);
+
+ public CommandInterpreter(Ids ids) {
+ this.asyncCompleter = new AsyncCompleter();
+ asyncCompleter.start();
+ this.ids = Objects.requireNonNull(ids);
+ }
+
+ public void interpret(PersistedValue<Long> state, Context context, Object message) {
+ if (message instanceof AsyncOperationResult) {
+ @SuppressWarnings("unchecked")
+ AsyncOperationResult<Commands, ?> res = (AsyncOperationResult<Commands, ?>) message;
+ interpret(state, context, res.metadata());
+ return;
+ }
+ if (!(message instanceof Any)) {
+ throw new IllegalArgumentException("wtf " + message);
+ }
+ Any any = (Any) message;
+ if (any.is(SourceCommand.class)) {
+ SourceCommand sourceCommand = unpack(any, SourceCommand.class);
+ interpret(state, context, sourceCommand.getCommands());
+ } else if (any.is(Commands.class)) {
+ Commands commands = unpack(any, Commands.class);
+ interpret(state, context, commands);
+ } else {
+ throw new IllegalArgumentException("Unknown message type " + any.getTypeUrl());
+ }
+ }
+
+ private void interpret(PersistedValue<Long> state, Context context, Commands command) {
+ for (Command cmd : command.getCommandList()) {
+ if (cmd.hasIncrement()) {
+ modifyState(state, context, cmd.getIncrement());
+ } else if (cmd.hasAsyncOperation()) {
+ registerAsyncOps(state, context, cmd.getAsyncOperation());
+ } else if (cmd.hasSend()) {
+ send(state, context, cmd.getSend());
+ } else if (cmd.hasSendAfter()) {
+ sendAfter(state, context, cmd.getSendAfter());
+ } else if (cmd.hasSendEgress()) {
+ sendEgress(state, context, cmd.getSendEgress());
+ } else if (cmd.hasVerify()) {
+ verify(state, context, cmd.getVerify());
+ }
+ }
+ }
+
+ private void verify(
+ PersistedValue<Long> state,
+ @SuppressWarnings("unused") Context context,
+ Command.Verify verify) {
+ int selfId = Integer.parseInt(context.self().id());
+ long actual = state.getOrDefault(0L);
+ long expected = verify.getExpected();
+ VerificationResult verificationResult =
+ VerificationResult.newBuilder()
+ .setId(selfId)
+ .setActual(actual)
+ .setExpected(expected)
+ .build();
+ context.send(Constants.VERIFICATION_RESULT, Any.pack(verificationResult));
+ }
+
+ private void sendEgress(
+ @SuppressWarnings("unused") PersistedValue<Long> state,
+ Context context,
+ @SuppressWarnings("unused") Command.SendEgress sendEgress) {
+ context.send(Constants.OUT, Any.getDefaultInstance());
+ }
+
+ private void sendAfter(
+ @SuppressWarnings("unused") PersistedValue<Long> state,
+ Context context,
+ Command.SendAfter send) {
+ FunctionType functionType = Constants.FN_TYPE;
+ String id = ids.idOf(send.getTarget());
+ context.sendAfter(sendAfterDelay, functionType, id, Any.pack(send.getCommands()));
+ }
+
+ private void send(
+ @SuppressWarnings("unused") PersistedValue<Long> state, Context context, Command.Send send) {
+ FunctionType functionType = Constants.FN_TYPE;
+ String id = ids.idOf(send.getTarget());
+ context.send(functionType, id, Any.pack(send.getCommands()));
+ }
+
+ private void registerAsyncOps(
+ @SuppressWarnings("unused") PersistedValue<Long> state,
+ Context context,
+ Command.AsyncOperation asyncOperation) {
+ CompletableFuture<Boolean> future =
+ asyncOperation.getFailure()
+ ? asyncCompleter.successfulFuture()
+ : asyncCompleter.failedFuture();
+
+ Commands next = asyncOperation.getResolvedCommands();
+ context.registerAsyncOperation(next, future);
+ }
+
+ private void modifyState(
+ PersistedValue<Long> state,
+ @SuppressWarnings("unused") Context context,
+ @SuppressWarnings("unused") Command.IncrementState incrementState) {
+ state.updateAndGet(n -> n == null ? 1 : n + 1);
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandRouter.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandRouter.java
new file mode 100644
index 0000000..e08ae8d
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandRouter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import com.google.protobuf.Any;
+import java.util.Objects;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.Router;
+
+public class CommandRouter implements Router<Any> {
+ private final Ids ids;
+
+ public CommandRouter(Ids ids) {
+ this.ids = Objects.requireNonNull(ids);
+ }
+
+ @Override
+ public void route(Any any, Downstream<Any> downstream) {
+ SourceCommand sourceCommand = ProtobufUtils.unpack(any, SourceCommand.class);
+ FunctionType type = Constants.FN_TYPE;
+ String id = ids.idOf(sourceCommand.getTarget());
+ downstream.forward(type, id, any);
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Constants.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Constants.java
new file mode 100644
index 0000000..f5cf262
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Constants.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import com.google.protobuf.Any;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+
+public class Constants {
+
+ public static final IngressIdentifier<Any> IN = new IngressIdentifier<>(Any.class, "", "source");
+
+ public static final EgressIdentifier<Any> OUT = new EgressIdentifier<>("", "sink", Any.class);
+
+ public static final FunctionType FN_TYPE = new FunctionType("v", "f1");
+
+ public static final EgressIdentifier<Any> VERIFICATION_RESULT =
+ new EgressIdentifier<>("", "verification", Any.class);
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Fn.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Fn.java
new file mode 100644
index 0000000..369e966
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Fn.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+public class Fn implements StatefulFunction {
+
+ @Persisted private final PersistedValue<Long> state = PersistedValue.of("state", Long.class);
+ private final CommandInterpreter interpreter;
+
+ public Fn(CommandInterpreter interpreter) {
+ this.interpreter = Objects.requireNonNull(interpreter);
+ }
+
+ @Override
+ public void invoke(Context context, Object message) {
+ interpreter.interpret(state, context, message);
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionProvider.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionProvider.java
new file mode 100644
index 0000000..a7eda2a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+
+public class FunctionProvider implements StatefulFunctionProvider {
+ private final Ids ids;
+
+ public FunctionProvider(Ids ids) {
+ this.ids = Objects.requireNonNull(ids);
+ }
+
+ @Override
+ public StatefulFunction functionOfType(FunctionType functionType) {
+ CommandInterpreter interpreter = new CommandInterpreter(ids);
+ return new Fn(interpreter);
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTracker.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTracker.java
new file mode 100644
index 0000000..d836094
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTracker.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import org.apache.flink.statefun.e2e.smoke.generated.Command;
+import org.apache.flink.statefun.e2e.smoke.generated.Commands;
+import org.apache.flink.statefun.e2e.smoke.generated.FunctionTrackerSnapshot;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+
+final class FunctionStateTracker {
+ private final long[] expectedStates;
+
+ public FunctionStateTracker(int numberOfFunctionInstances) {
+ this.expectedStates = new long[numberOfFunctionInstances];
+ }
+
+ /**
+ * Find any state modification commands nested under @sourceCommand, and apply them in the
+ * internal state representation.
+ */
+ public void apply(SourceCommand sourceCommand) {
+ updateInternally(sourceCommand.getTarget(), sourceCommand.getCommands());
+ }
+
+ /** Apply all the state modification stored in the snapshot represented by the snapshotBytes. */
+ public FunctionStateTracker apply(FunctionTrackerSnapshot snapshot) {
+ for (int i = 0; i < snapshot.getStateCount(); i++) {
+ expectedStates[i] += snapshot.getState(i);
+ }
+ return this;
+ }
+
+ /** Get the current expected state of a function instance. */
+ public long stateOf(int id) {
+ return expectedStates[id];
+ }
+
+ public FunctionTrackerSnapshot.Builder snapshot() {
+ FunctionTrackerSnapshot.Builder snapshot = FunctionTrackerSnapshot.newBuilder();
+ for (long state : expectedStates) {
+ snapshot.addState(state);
+ }
+ return snapshot;
+ }
+
+ /**
+ * Recursively traverse the commands tree and look for {@link Command.IncrementState} commands.
+ * For each {@code ModifyState} command found update the corresponding expected state.
+ */
+ private void updateInternally(int currentAddress, Commands commands) {
+ for (Command command : commands.getCommandList()) {
+ if (command.hasIncrement()) {
+ expectedStates[currentAddress]++;
+ } else if (command.hasSend()) {
+ updateInternally(command.getSend().getTarget(), command.getSend().getCommands());
+ } else if (command.hasSendAfter()) {
+ updateInternally(command.getSendAfter().getTarget(), command.getSendAfter().getCommands());
+ } else if (command.hasAsyncOperation()) {
+ updateInternally(currentAddress, command.getAsyncOperation().getResolvedCommands());
+ }
+ }
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Ids.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Ids.java
new file mode 100644
index 0000000..b9fbc9f
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Ids.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+final class Ids {
+ private final String[] cache;
+
+ public Ids(int maxIds) {
+ this.cache = createIds(maxIds);
+ }
+
+ public String idOf(int address) {
+ return cache[address];
+ }
+
+ private static String[] createIds(int maxIds) {
+ String[] ids = new String[maxIds];
+ for (int i = 0; i < maxIds; i++) {
+ ids[i] = Integer.toString(i);
+ }
+ return ids;
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Module.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Module.java
new file mode 100644
index 0000000..21db25b
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Module.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.Constants.IN;
+
+import com.google.auto.service.AutoService;
+import com.google.protobuf.Any;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec;
+import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@AutoService(StatefulFunctionModule.class)
+public class Module implements StatefulFunctionModule {
+ public static final Logger LOG = LoggerFactory.getLogger(Module.class);
+
+ @Override
+ public void configure(Map<String, String> globalConfiguration, Binder binder) {
+ ModuleParameters moduleParameters = ModuleParameters.from(globalConfiguration);
+ LOG.info(moduleParameters.toString());
+
+ Ids ids = new Ids(moduleParameters.getNumberOfFunctionInstances());
+
+ binder.bindIngress(new SourceFunctionSpec<>(IN, new CommandFlinkSource(moduleParameters)));
+ binder.bindEgress(new SinkFunctionSpec<>(Constants.OUT, new DiscardingSink<>()));
+ binder.bindIngressRouter(IN, new CommandRouter(ids));
+
+ FunctionProvider provider = new FunctionProvider(ids);
+ binder.bindFunctionProvider(Constants.FN_TYPE, provider);
+
+ SocketClientSink<Any> client =
+ new SocketClientSink<>(
+ moduleParameters.getVerificationServerHost(),
+ moduleParameters.getVerificationServerPort(),
+ new VerificationResultSerializer(),
+ 3,
+ true);
+
+ binder.bindEgress(new SinkFunctionSpec<>(Constants.VERIFICATION_RESULT, client));
+ }
+
+ private static final class VerificationResultSerializer implements SerializationSchema<Any> {
+
+ @Override
+ public byte[] serialize(Any element) {
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream(element.getSerializedSize() + 8);
+ element.writeDelimitedTo(out);
+ return out.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ModuleParameters.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ModuleParameters.java
new file mode 100644
index 0000000..0d2126e
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ModuleParameters.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+@SuppressWarnings("unused")
+public final class ModuleParameters implements Serializable {
+
+ private static final long serialVersionUID = 1;
+
+ private int numberOfFunctionInstances = 1_000;
+ private int commandDepth = 10;
+ private int messageCount = 100_000;
+ private int maxCommandsPerDepth = 3;
+ private double stateModificationsPr = 0.4;
+ private double sendPr = 0.9;
+ private double sendAfterPr = 0.1;
+ private double asyncSendPr = 0.1;
+ private double noopPr = 0.2;
+ private double sendEgressPr = 0.03;
+ private int maxFailures = 1;
+ private String verificationServerHost = "localhost";
+ private int verificationServerPort = 5050;
+
+ /** Creates an instance of ModuleParameters from a key-value map. */
+ public static ModuleParameters from(Map<String, String> globalConfiguration) {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return mapper.convertValue(globalConfiguration, ModuleParameters.class);
+ }
+
+ public Map<String, String> asMap() {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.convertValue(this, new TypeReference<Map<String, String>>() {});
+ }
+
+ public int getNumberOfFunctionInstances() {
+ return numberOfFunctionInstances;
+ }
+
+ public void setNumberOfFunctionInstances(int numberOfFunctionInstances) {
+ this.numberOfFunctionInstances = numberOfFunctionInstances;
+ }
+
+ public int getCommandDepth() {
+ return commandDepth;
+ }
+
+ public void setCommandDepth(int commandDepth) {
+ this.commandDepth = commandDepth;
+ }
+
+ public int getMessageCount() {
+ return messageCount;
+ }
+
+ public void setMessageCount(int messageCount) {
+ this.messageCount = messageCount;
+ }
+
+ public int getMaxCommandsPerDepth() {
+ return maxCommandsPerDepth;
+ }
+
+ public void setMaxCommandsPerDepth(int maxCommandsPerDepth) {
+ this.maxCommandsPerDepth = maxCommandsPerDepth;
+ }
+
+ public double getStateModificationsPr() {
+ return stateModificationsPr;
+ }
+
+ public void setStateModificationsPr(double stateModificationsPr) {
+ this.stateModificationsPr = stateModificationsPr;
+ }
+
+ public double getSendPr() {
+ return sendPr;
+ }
+
+ public void setSendPr(double sendPr) {
+ this.sendPr = sendPr;
+ }
+
+ public double getSendAfterPr() {
+ return sendAfterPr;
+ }
+
+ public void setSendAfterPr(double sendAfterPr) {
+ this.sendAfterPr = sendAfterPr;
+ }
+
+ public double getAsyncSendPr() {
+ return asyncSendPr;
+ }
+
+ public void setAsyncSendPr(double asyncSendPr) {
+ this.asyncSendPr = asyncSendPr;
+ }
+
+ public double getNoopPr() {
+ return noopPr;
+ }
+
+ public void setNoopPr(double noopPr) {
+ this.noopPr = noopPr;
+ }
+
+ public double getSendEgressPr() {
+ return sendEgressPr;
+ }
+
+ public void setSendEgressPr(double sendEgressPr) {
+ this.sendEgressPr = sendEgressPr;
+ }
+
+ public void setMaxFailures(int maxFailures) {
+ this.maxFailures = maxFailures;
+ }
+
+ public int getMaxFailures() {
+ return maxFailures;
+ }
+
+ public String getVerificationServerHost() {
+ return verificationServerHost;
+ }
+
+ public void setVerificationServerHost(String verificationServerHost) {
+ this.verificationServerHost = verificationServerHost;
+ }
+
+ public int getVerificationServerPort() {
+ return verificationServerPort;
+ }
+
+ public void setVerificationServerPort(int verificationServerPort) {
+ this.verificationServerPort = verificationServerPort;
+ }
+
+ @Override
+ public String toString() {
+ return "ModuleParameters{"
+ + "numberOfFunctionInstances="
+ + numberOfFunctionInstances
+ + ", commandDepth="
+ + commandDepth
+ + ", messageCount="
+ + messageCount
+ + ", maxCommandsPerDepth="
+ + maxCommandsPerDepth
+ + ", stateModificationsPr="
+ + stateModificationsPr
+ + ", sendPr="
+ + sendPr
+ + ", sendAfterPr="
+ + sendAfterPr
+ + ", asyncSendPr="
+ + asyncSendPr
+ + ", noopPr="
+ + noopPr
+ + ", sendEgressPr="
+ + sendEgressPr
+ + ", maxFailures="
+ + maxFailures
+ + ", verificationServerHost='"
+ + verificationServerHost
+ + '\''
+ + ", verificationServerPort="
+ + verificationServerPort
+ + '}';
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ProtobufUtils.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ProtobufUtils.java
new file mode 100644
index 0000000..25aec2a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ProtobufUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+final class ProtobufUtils {
+ private ProtobufUtils() {}
+
+ public static <T extends Message> T unpack(Any any, Class<T> messageType) {
+ try {
+ return any.unpack(messageType);
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/commands.proto b/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/commands.proto
new file mode 100644
index 0000000..e3912d1
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/commands.proto
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.e2e.smoke;
+option java_package = "org.apache.flink.statefun.e2e.smoke.generated";
+
+option java_multiple_files = true;
+
+message SourceCommand {
+ int32 target = 1;
+ Commands commands = 2;
+}
+
+message Commands {
+ repeated Command command = 1;
+}
+
+message Command {
+ message IncrementState {
+ }
+ message Send {
+ int32 target = 1;
+ Commands commands = 2;
+ }
+ message SendAfter {
+ int32 target = 1;
+ Commands commands = 2;
+ }
+ message SendEgress {
+ }
+ message AsyncOperation {
+ bool failure = 1;
+ Commands resolved_commands = 2;
+ }
+ message Verify {
+ int64 expected = 1;
+ }
+
+ oneof command {
+ IncrementState increment = 1;
+ Send send = 2;
+ SendAfter send_after = 3;
+ SendEgress send_egress = 4;
+ AsyncOperation async_operation = 5;
+ Verify verify = 6;
+ }
+}
+
+message VerificationResult {
+ int32 id = 1;
+ int64 expected = 2;
+ int64 actual = 3;
+}
+
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/internal.proto b/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/internal.proto
new file mode 100644
index 0000000..138fdff
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/internal.proto
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.e2e.smoke;
+option java_package = "org.apache.flink.statefun.e2e.smoke.generated";
+option java_multiple_files = true;
+
+message FunctionTrackerSnapshot {
+ repeated int64 state = 1;
+}
+
+message SourceSnapshot {
+ int32 commandsSentSoFarHandle = 1;
+ int32 failuresGeneratedSoFar = 2;
+ FunctionTrackerSnapshot tracker = 3;
+}
+
+
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandGeneratorTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandGeneratorTest.java
new file mode 100644
index 0000000..49139cc
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandGeneratorTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+import org.junit.Test;
+
+public class CommandGeneratorTest {
+
+ @Test
+ public void usageExample() {
+ ModuleParameters parameters = new ModuleParameters();
+ CommandGenerator generator = new CommandGenerator(new JDKRandomGenerator(), parameters);
+
+ SourceCommand command = generator.get();
+
+ assertThat(command.getTarget(), notNullValue());
+ assertThat(command.getCommands(), notNullValue());
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreterTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreterTest.java
new file mode 100644
index 0000000..1010666
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreterTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.Utils.aStateModificationCommand;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.protobuf.Any;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+import org.junit.Test;
+
+public class CommandInterpreterTest {
+
+ @Test
+ public void exampleUsage() {
+ CommandInterpreter interpreter = new CommandInterpreter(new Ids(10));
+
+ PersistedValue<Long> state = PersistedValue.of("state", Long.class);
+ Context context = new MockContext();
+ SourceCommand sourceCommand = aStateModificationCommand();
+
+ interpreter.interpret(state, context, Any.pack(sourceCommand));
+
+ assertThat(state.get(), is(1L));
+ }
+
+ private static final class MockContext implements Context {
+
+ @Override
+ public Address self() {
+ return null;
+ }
+
+ @Override
+ public Address caller() {
+ return null;
+ }
+
+ @Override
+ public void send(Address address, Object o) {}
+
+ @Override
+ public <T> void send(EgressIdentifier<T> egressIdentifier, T t) {}
+
+ @Override
+ public void sendAfter(Duration duration, Address address, Object o) {}
+
+ @Override
+ public <M, T> void registerAsyncOperation(M m, CompletableFuture<T> completableFuture) {}
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTrackerTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTrackerTest.java
new file mode 100644
index 0000000..97dba1f
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTrackerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.Utils.aRelayedStateModificationCommand;
+import static org.apache.flink.statefun.e2e.smoke.Utils.aStateModificationCommand;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.junit.Test;
+
+public class FunctionStateTrackerTest {
+
+ @Test
+ public void exampleUsage() {
+ FunctionStateTracker tracker = new FunctionStateTracker(1_000);
+
+ tracker.apply(aStateModificationCommand(5));
+ tracker.apply(aStateModificationCommand(5));
+ tracker.apply(aStateModificationCommand(5));
+
+ assertThat(tracker.stateOf(5), is(3L));
+ }
+
+ @Test
+ public void testRelay() {
+ FunctionStateTracker tracker = new FunctionStateTracker(1_000);
+
+ // send a layered state increment message, first to function 5, and then
+ // to function 6.
+ tracker.apply(aRelayedStateModificationCommand(5, 6));
+
+ assertThat(tracker.stateOf(5), is(0L));
+ assertThat(tracker.stateOf(6), is(1L));
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/HarnessTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/HarnessTest.java
new file mode 100644
index 0000000..88864f8
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/HarnessTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.Utils.awaitVerificationSuccess;
+import static org.apache.flink.statefun.e2e.smoke.Utils.startProtobufServer;
+
+import com.google.protobuf.Any;
+import org.apache.flink.statefun.flink.harness.Harness;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HarnessTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HarnessTest.class);
+
+ @Ignore
+ @Test(timeout = 1_000 * 60 * 2)
+ public void miniClusterTest() throws Exception {
+ Harness harness = new Harness();
+
+ // set Flink related configuration.
+ harness.withConfiguration(
+ "classloader.parent-first-patterns.additional",
+ "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
+ harness.withConfiguration("restart-strategy", "fixed-delay");
+ harness.withConfiguration("restart-strategy.fixed-delay.attempts", "2147483647");
+ harness.withConfiguration("restart-strategy.fixed-delay.delay", "1sec");
+ harness.withConfiguration("execution.checkpointing.interval", "2sec");
+ harness.withConfiguration("execution.checkpointing.mode", "EXACTLY_ONCE");
+ harness.withConfiguration("execution.checkpointing.max-concurrent-checkpoints", "3");
+ harness.withConfiguration("parallelism.default", "2");
+ harness.withConfiguration("state.checkpoints.dir", "file:///tmp/checkpoints");
+
+ // start the Protobuf server
+ SimpleProtobufServer.StartedServer<Any> started = startProtobufServer();
+
+ // configure test parameters.
+ ModuleParameters parameters = new ModuleParameters();
+ parameters.setMaxFailures(1);
+ parameters.setMessageCount(100_000);
+ parameters.setNumberOfFunctionInstances(128);
+ parameters.setVerificationServerHost("localhost");
+ parameters.setVerificationServerPort(started.port());
+ parameters.asMap().forEach(harness::withGlobalConfiguration);
+
+ // run the harness.
+ try (AutoCloseable ignored = startHarnessInTheBackground(harness)) {
+ awaitVerificationSuccess(started.results(), parameters.getNumberOfFunctionInstances());
+ }
+
+ LOG.info("All done.");
+ }
+
+ private static AutoCloseable startHarnessInTheBackground(Harness harness) {
+ Thread t =
+ new Thread(
+ () -> {
+ try {
+ harness.start();
+ } catch (InterruptedException ignored) {
+ LOG.info("Harness Thread was interrupted. Exiting...");
+ } catch (Exception exception) {
+ LOG.info("Something happened while trying to run the Harness.", exception);
+ }
+ });
+ t.setName("harness-runner");
+ t.setDaemon(true);
+ t.start();
+ return t::interrupt;
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/ModuleParametersTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/ModuleParametersTest.java
new file mode 100644
index 0000000..be5a9b5
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/ModuleParametersTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+import java.util.Map;
+import org.junit.Test;
+
+public class ModuleParametersTest {
+
+ @Test
+ public void exampleUsage() {
+ Map<String, String> keys = Collections.singletonMap("messageCount", "1");
+ ModuleParameters parameters = ModuleParameters.from(keys);
+
+ assertThat(parameters.getMessageCount(), is(1));
+ }
+
+ @Test
+ public void roundTrip() {
+ ModuleParameters original = new ModuleParameters();
+ original.setCommandDepth(1234);
+
+ ModuleParameters deserialized = ModuleParameters.from(original.asMap());
+
+ assertThat(deserialized.getCommandDepth(), is(1234));
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SimpleProtobufServer.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SimpleProtobufServer.java
new file mode 100644
index 0000000..58e18bf
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SimpleProtobufServer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple threaded TCP server that is able to receive a specific Protocol Buffers message type.
+ *
+ * @param <T> input message type.
+ */
+@ThreadSafe
+public final class SimpleProtobufServer<T extends Message> {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleProtobufServer.class);
+
+ private final LinkedBlockingDeque<T> results = new LinkedBlockingDeque<>();
+ private final ExecutorService executor;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final Parser<T> parser;
+
+ public SimpleProtobufServer(Parser<T> parser) {
+ this.executor = MoreExecutors.newCachedDaemonThreadPool();
+ this.parser = parser;
+ }
+
+ StartedServer<T> start() {
+ if (!started.compareAndSet(false, true)) {
+ throw new IllegalArgumentException("Already started.");
+ }
+ try {
+ ServerSocket serverSocket = new ServerSocket(0);
+ serverSocket.setReuseAddress(true);
+ LOG.info("Starting server at " + serverSocket.getLocalPort());
+ executor.submit(() -> acceptClients(serverSocket));
+ return new StartedServer<>(serverSocket.getLocalPort(), results());
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to bind the TCP server.", e);
+ }
+ }
+
+ private Supplier<T> results() {
+ return () -> {
+ try {
+ return results.take();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+
+ @SuppressWarnings("InfiniteLoopStatement")
+ private void acceptClients(ServerSocket serverSocket) {
+ while (true) {
+ try {
+ Socket client = serverSocket.accept();
+ InputStream input = client.getInputStream();
+ executor.submit(() -> pumpVerificationResults(client, input));
+ } catch (IOException e) {
+ LOG.info("Exception while trying to accept a connection.", e);
+ }
+ }
+ }
+
+ private void pumpVerificationResults(Socket client, InputStream input) {
+ while (true) {
+ try {
+ T result = parser.parseDelimitedFrom(input);
+ if (result != null) {
+ results.add(result);
+ }
+ } catch (IOException e) {
+ LOG.info(
+ "Exception reading a verification result from "
+ + client.getRemoteSocketAddress()
+ + ", bye...",
+ e);
+ IOUtils.closeQuietly(client);
+ return;
+ }
+ }
+ }
+
+ public static final class StartedServer<T extends Message> {
+ private final int port;
+ private final Supplier<T> results;
+
+ public StartedServer(int port, Supplier<T> results) {
+ this.port = port;
+ this.results = results;
+ }
+
+ public int port() {
+ return port;
+ }
+
+ public Supplier<T> results() {
+ return results;
+ }
+ }
+
+ private static final class MoreExecutors {
+
+ static ExecutorService newCachedDaemonThreadPool() {
+ return Executors.newCachedThreadPool(
+ r -> {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ return t;
+ });
+ }
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
new file mode 100644
index 0000000..9f2065e
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.Utils.awaitVerificationSuccess;
+import static org.apache.flink.statefun.e2e.smoke.Utils.startProtobufServer;
+
+import com.google.protobuf.Any;
+import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.Testcontainers;
+
+public final class SmokeRunner {
+ private static final Logger LOG = LoggerFactory.getLogger(SmokeRunner.class);
+
+ public static void run(ModuleParameters parameters) throws Throwable {
+ SimpleProtobufServer.StartedServer<Any> server = startProtobufServer();
+ parameters.setVerificationServerHost("host.testcontainers.internal");
+ parameters.setVerificationServerPort(server.port());
+
+ StatefulFunctionsAppContainers.Builder builder =
+ StatefulFunctionsAppContainers.builder("smoke", 2);
+ builder.exposeMasterLogs(LOG);
+
+ // set the test module parameters as global configurations, so that
+ // it can be deserialized at Module#configure()
+ parameters.asMap().forEach(builder::withModuleGlobalConfiguration);
+
+ // run the test
+ Testcontainers.exposeHostPorts(server.port());
+ StatefulFunctionsAppContainers app = builder.build();
+
+ run(
+ app,
+ () ->
+ awaitVerificationSuccess(server.results(), parameters.getNumberOfFunctionInstances()));
+ }
+
+ private static void run(StatefulFunctionsAppContainers app, ThrowingRunnable<Throwable> r)
+ throws Throwable {
+ Statement statement =
+ app.apply(
+ new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ r.run();
+ }
+ },
+ Description.EMPTY);
+
+ statement.evaluate();
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeVerificationE2E.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeVerificationE2E.java
new file mode 100644
index 0000000..f1b5b6c
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeVerificationE2E.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import org.junit.Test;
+
+public class SmokeVerificationE2E {
+
+ @Test(timeout = 1_000 * 60 * 10)
+ public void runWith() throws Throwable {
+ ModuleParameters parameters = new ModuleParameters();
+ parameters.setNumberOfFunctionInstances(128);
+ parameters.setMessageCount(100_000);
+ parameters.setMaxFailures(1);
+
+ SmokeRunner.run(parameters);
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/Utils.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/Utils.java
new file mode 100644
index 0000000..85f527d
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/Utils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import com.google.protobuf.Any;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.e2e.smoke.generated.Command;
+import org.apache.flink.statefun.e2e.smoke.generated.Commands;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
+
+class Utils {
+
+ public static SourceCommand aStateModificationCommand() {
+ return aStateModificationCommand(-1234); // the id doesn't matter
+ }
+
+ public static SourceCommand aStateModificationCommand(int functionInstanceId) {
+ return SourceCommand.newBuilder()
+ .setTarget(functionInstanceId)
+ .setCommands(Commands.newBuilder().addCommand(modify()))
+ .build();
+ }
+
+ public static SourceCommand aRelayedStateModificationCommand(
+ int firstFunctionId, int secondFunctionId) {
+ return SourceCommand.newBuilder()
+ .setTarget(firstFunctionId)
+ .setCommands(Commands.newBuilder().addCommand(sendTo(secondFunctionId, modify())))
+ .build();
+ }
+
+ private static Command.Builder sendTo(int id, Command.Builder body) {
+ return Command.newBuilder()
+ .setSend(
+ Command.Send.newBuilder()
+ .setTarget(id)
+ .setCommands(Commands.newBuilder().addCommand(body)));
+ }
+
+ private static Command.Builder modify() {
+ return Command.newBuilder().setIncrement(Command.IncrementState.getDefaultInstance());
+ }
+
+ /** Blocks the currently executing thread until enough successful verification results supply. */
+ static void awaitVerificationSuccess(Supplier<Any> results, final int numberOfFunctionInstances) {
+ Set<Integer> successfullyVerified = new HashSet<>();
+ while (successfullyVerified.size() != numberOfFunctionInstances) {
+ Any any = results.get();
+ VerificationResult result = ProtobufUtils.unpack(any, VerificationResult.class);
+ if (result.getActual() == result.getExpected()) {
+ successfullyVerified.add(result.getId());
+ } else if (result.getActual() > result.getExpected()) {
+ throw new AssertionError(
+ "Over counted. Expected: "
+ + result.getExpected()
+ + ", actual: "
+ + result.getActual()
+ + ", function: "
+ + result.getId());
+ }
+ }
+ }
+
+ /** starts a simple Protobuf TCP server that accepts {@link com.google.protobuf.Any}. */
+ static SimpleProtobufServer.StartedServer<Any> startProtobufServer() {
+ SimpleProtobufServer<Any> server = new SimpleProtobufServer<>(Any.parser());
+ return server.start();
+ }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/Dockerfile
new file mode 100644
index 0000000..3166768
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/Dockerfile
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM flink-statefun:2.3-SNAPSHOT
+
+RUN mkdir -p /opt/statefun/modules/statefun-smoke-e2e
+COPY statefun-smoke-e2e*.jar /opt/statefun/modules/statefun-smoke-e2e/
+COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fb965d3
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml
index 694e52d..ea5ce29 100644
--- a/tools/maven/spotbugs-exclude.xml
+++ b/tools/maven/spotbugs-exclude.xml
@@ -101,5 +101,8 @@ under the License.
<Package name="~org\.apache\.flink\.statefun\.examples\.ridesharing\.simulator.*" />
</Match>
+ <Match>
+ <Package name="~org\.apache\.flink\.statefun\.e2e.smoke.*" />
+ </Match>
</FindBugsFilter>