You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/12/03 08:24:26 UTC

[flink-statefun] 01/03: [FLINK-20303] [e2e] Add a SmokeE2E test

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit b86616bae4ce58dbdeebb263810de9a9c85fff61
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Fri Nov 20 21:20:36 2020 +0100

    [FLINK-20303] [e2e] Add a SmokeE2E test
    
    This closes #178.
---
 statefun-e2e-tests/pom.xml                         |   1 +
 statefun-e2e-tests/statefun-smoke-e2e/pom.xml      | 143 +++++++++++++
 .../flink/statefun/e2e/smoke/AsyncCompleter.java   | 105 ++++++++++
 .../statefun/e2e/smoke/CommandFlinkSource.java     | 233 +++++++++++++++++++++
 .../flink/statefun/e2e/smoke/CommandGenerator.java | 162 ++++++++++++++
 .../statefun/e2e/smoke/CommandInterpreter.java     | 144 +++++++++++++
 .../flink/statefun/e2e/smoke/CommandRouter.java    |  40 ++++
 .../apache/flink/statefun/e2e/smoke/Constants.java |  35 ++++
 .../org/apache/flink/statefun/e2e/smoke/Fn.java    |  39 ++++
 .../flink/statefun/e2e/smoke/FunctionProvider.java |  37 ++++
 .../statefun/e2e/smoke/FunctionStateTracker.java   |  78 +++++++
 .../org/apache/flink/statefun/e2e/smoke/Ids.java   |  38 ++++
 .../apache/flink/statefun/e2e/smoke/Module.java    |  78 +++++++
 .../flink/statefun/e2e/smoke/ModuleParameters.java | 193 +++++++++++++++++
 .../flink/statefun/e2e/smoke/ProtobufUtils.java    |  34 +++
 .../src/main/protobuf/commands.proto               |  71 +++++++
 .../src/main/protobuf/internal.proto               |  35 ++++
 .../statefun/e2e/smoke/CommandGeneratorTest.java   |  40 ++++
 .../statefun/e2e/smoke/CommandInterpreterTest.java |  73 +++++++
 .../e2e/smoke/FunctionStateTrackerTest.java        |  52 +++++
 .../flink/statefun/e2e/smoke/HarnessTest.java      |  90 ++++++++
 .../statefun/e2e/smoke/ModuleParametersTest.java   |  47 +++++
 .../statefun/e2e/smoke/SimpleProtobufServer.java   | 142 +++++++++++++
 .../flink/statefun/e2e/smoke/SmokeRunner.java      |  73 +++++++
 .../statefun/e2e/smoke/SmokeVerificationE2E.java   |  34 +++
 .../org/apache/flink/statefun/e2e/smoke/Utils.java |  87 ++++++++
 .../src/test/resources/Dockerfile                  |  20 ++
 .../src/test/resources/log4j.properties            |  24 +++
 tools/maven/spotbugs-exclude.xml                   |   3 +
 29 files changed, 2151 insertions(+)

diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index b754f0e..e38a954 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -32,6 +32,7 @@ under the License.
         <module>statefun-e2e-tests-common</module>
         <module>statefun-sanity-e2e</module>
         <module>statefun-exactly-once-remote-e2e</module>
+        <module>statefun-smoke-e2e</module>
     </modules>
 
     <build>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e/pom.xml
new file mode 100644
index 0000000..71bb3c3
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>statefun-e2e-tests</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>2.3-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>statefun-smoke-e2e</artifactId>
+
+    <properties>
+        <testcontainers.version>1.12.5</testcontainers.version>
+        <commons-math3.version>3.5</commons-math3.version>
+    </properties>
+
+    <dependencies>
+        <!-- Stateful Functions -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-flink-io</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-flink-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- smoke logic -->
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+            <version>${commons-math3.version}</version>
+        </dependency>
+
+        <!-- Protobuf -->
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+
+        <!-- streaming runtime -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <!--
+                This artifact transitively depends on different versions of slf4j-api.
+                To see the complete list, comment this exclusion run mvn enforcer:enforce.
+                -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <!--
+                This artifact transitively depends on different versions of slf4j-api.
+                To see the complete list, comment this exclusion run mvn enforcer:enforce.
+                -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.15</version>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+        </dependency>
+
+        <!-- End-to-end test common -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-e2e-tests-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <!-- conflicts with flink-core -->
+                <exclusion>
+                    <groupId>com.kohlschutter.junixsocket</groupId>
+                    <artifactId>junixsocket-native-common</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-flink-harness</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.github.os72</groupId>
+                <artifactId>protoc-jar-maven-plugin</artifactId>
+                <version>${protoc-jar-maven-plugin.version}</version>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/AsyncCompleter.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/AsyncCompleter.java
new file mode 100644
index 0000000..e16bdc8
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/AsyncCompleter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * Creates {@link CompletableFuture}s that can be completed successfully or unsuccessfully, within 1
+ * millisecond delay.
+ */
+final class AsyncCompleter {
+
+  private static final Throwable EXCEPTION;
+
+  static {
+    Throwable t = new RuntimeException();
+    t.setStackTrace(new StackTraceElement[0]);
+    EXCEPTION = t;
+  }
+
+  private static final class Task {
+    final long time;
+    final CompletableFuture<Boolean> future;
+    final boolean success;
+
+    public Task(boolean success) {
+      this.time = System.nanoTime();
+      this.future = new CompletableFuture<>();
+      this.success = success;
+    }
+  }
+
+  private static final int ONE_MILLISECOND = Duration.ofMillis(1).getNano();
+  private final LinkedBlockingDeque<Task> queue = new LinkedBlockingDeque<>();
+  private boolean started;
+
+  /**
+   * Returns a future that would be complete successfully, no sooner than 1 millisecond from now.
+   */
+  CompletableFuture<Boolean> successfulFuture() {
+    return future(true);
+  }
+
+  /**
+   * Returns a future that would be completed unsuccessfully, no sooner than 1 millisecond from now.
+   */
+  CompletableFuture<Boolean> failedFuture() {
+    return future(false);
+  }
+
+  private CompletableFuture<Boolean> future(boolean success) {
+    Task e = new Task(success);
+    queue.add(e);
+    return e.future;
+  }
+
+  void start() {
+    if (started) {
+      return;
+    }
+    started = true;
+    Thread t = new Thread(this::run);
+    t.setDaemon(true);
+    t.start();
+  }
+
+  @SuppressWarnings({"InfiniteLoopStatement", "BusyWait"})
+  void run() {
+    while (true) {
+      try {
+        Task e = queue.take();
+        final long duration = System.nanoTime() - e.time;
+        if (duration < ONE_MILLISECOND) {
+          Thread.sleep(1);
+        }
+        CompletableFuture<Boolean> future = e.future;
+        if (e.success) {
+          future.complete(Boolean.TRUE);
+        } else {
+          future.completeExceptionally(EXCEPTION);
+        }
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandFlinkSource.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandFlinkSource.java
new file mode 100644
index 0000000..ea4ed39
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandFlinkSource.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.generated.Command.Verify;
+import static org.apache.flink.statefun.e2e.smoke.generated.Command.newBuilder;
+
+import com.google.protobuf.Any;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.statefun.e2e.smoke.generated.Command;
+import org.apache.flink.statefun.e2e.smoke.generated.Commands;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceSnapshot;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink Source that Emits {@link SourceCommand}s.
+ *
+ * <p>This source is configured by {@link ModuleParameters} and would generate random commands,
+ * addressed to various functions. This source might also throw exceptions (kaboom) to simulate
+ * failures.
+ *
+ * <p>After generating {@link ModuleParameters#getMessageCount()} messages, this source will switch
+ * to {@code verification} step. At this step, it would keep sending (every 2 seconds) a {@link
+ * Verify} command to every function indefinitely.
+ */
+final class CommandFlinkSource extends RichSourceFunction<Any>
+    implements CheckpointedFunction, CheckpointListener {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CommandFlinkSource.class);
+
+  // ------------------------------------------------------------------------------------------------------------
+  // Configuration
+  // ------------------------------------------------------------------------------------------------------------
+
+  private final ModuleParameters moduleParameters;
+
+  // ------------------------------------------------------------------------------------------------------------
+  // Runtime
+  // ------------------------------------------------------------------------------------------------------------
+
+  private transient ListState<SourceSnapshot> sourceSnapshotHandle;
+  private transient FunctionStateTracker functionStateTracker;
+  private transient int commandsSentSoFar;
+  private transient int failuresSoFar;
+  private transient boolean done;
+  private transient boolean atLeastOneCheckpointCompleted;
+
+  public CommandFlinkSource(ModuleParameters moduleParameters) {
+    this.moduleParameters = Objects.requireNonNull(moduleParameters);
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws Exception {
+    OperatorStateStore store = context.getOperatorStateStore();
+    sourceSnapshotHandle =
+        store.getUnionListState(new ListStateDescriptor<>("snapshot", SourceSnapshot.class));
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    SourceSnapshot sourceSnapshot =
+        getOnlyElement(sourceSnapshotHandle.get(), SourceSnapshot.getDefaultInstance());
+    functionStateTracker =
+        new FunctionStateTracker(moduleParameters.getNumberOfFunctionInstances())
+            .apply(sourceSnapshot.getTracker());
+    commandsSentSoFar = sourceSnapshot.getCommandsSentSoFarHandle();
+    failuresSoFar = sourceSnapshot.getFailuresGeneratedSoFar();
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    sourceSnapshotHandle.clear();
+    sourceSnapshotHandle.add(
+        SourceSnapshot.newBuilder()
+            .setCommandsSentSoFarHandle(commandsSentSoFar)
+            .setTracker(functionStateTracker.snapshot())
+            .setFailuresGeneratedSoFar(failuresSoFar)
+            .build());
+
+    if (commandsSentSoFar < moduleParameters.getMessageCount()) {
+      double perCent = 100.0d * (commandsSentSoFar) / moduleParameters.getMessageCount();
+      LOG.info(
+          "Commands sent {} / {} ({} %)",
+          commandsSentSoFar, moduleParameters.getMessageCount(), perCent);
+    }
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {
+    atLeastOneCheckpointCompleted = true;
+  }
+
+  @Override
+  public void cancel() {
+    done = true;
+  }
+
+  // ------------------------------------------------------------------------------------------------------------
+  // Generation
+  // ------------------------------------------------------------------------------------------------------------
+
+  @Override
+  public void run(SourceContext<Any> ctx) {
+    generate(ctx);
+    do {
+      verify(ctx);
+      snooze();
+      synchronized (ctx.getCheckpointLock()) {
+        if (done) {
+          return;
+        }
+      }
+    } while (true);
+  }
+
+  private void generate(SourceContext<Any> ctx) {
+    final int startPosition = this.commandsSentSoFar;
+    final OptionalInt kaboomIndex =
+        computeFailureIndex(startPosition, failuresSoFar, moduleParameters.getMaxFailures());
+    if (kaboomIndex.isPresent()) {
+      failuresSoFar++;
+    }
+    LOG.info(
+        "starting at {}, kaboom at {}, total messages {}",
+        startPosition,
+        kaboomIndex,
+        moduleParameters.getMessageCount());
+    Supplier<SourceCommand> generator =
+        new CommandGenerator(new JDKRandomGenerator(), moduleParameters);
+    FunctionStateTracker functionStateTracker = this.functionStateTracker;
+    for (int i = startPosition; i < moduleParameters.getMessageCount(); i++) {
+      if (atLeastOneCheckpointCompleted && kaboomIndex.isPresent() && i >= kaboomIndex.getAsInt()) {
+        throw new RuntimeException("KABOOM!!!");
+      }
+      SourceCommand command = generator.get();
+      synchronized (ctx.getCheckpointLock()) {
+        if (done) {
+          return;
+        }
+        functionStateTracker.apply(command);
+        ctx.collect(Any.pack(command));
+        this.commandsSentSoFar = i;
+      }
+    }
+  }
+
+  private void verify(SourceContext<Any> ctx) {
+    FunctionStateTracker functionStateTracker = this.functionStateTracker;
+
+    for (int i = 0; i < moduleParameters.getNumberOfFunctionInstances(); i++) {
+      final long expected = functionStateTracker.stateOf(i);
+
+      Command.Builder verify = newBuilder().setVerify(Verify.newBuilder().setExpected(expected));
+
+      SourceCommand command =
+          SourceCommand.newBuilder()
+              .setTarget(i)
+              .setCommands(Commands.newBuilder().addCommand(verify))
+              .build();
+      synchronized (ctx.getCheckpointLock()) {
+        ctx.collect(Any.pack(command));
+      }
+    }
+  }
+
+  // ---------------------------------------------------------------------------------------------------------------
+  // Utils
+  // ---------------------------------------------------------------------------------------------------------------
+
+  private OptionalInt computeFailureIndex(int startPosition, int failureSoFar, int maxFailures) {
+    if (failureSoFar >= maxFailures) {
+      return OptionalInt.empty();
+    }
+    if (startPosition >= moduleParameters.getMessageCount()) {
+      return OptionalInt.empty();
+    }
+    int index =
+        ThreadLocalRandom.current().nextInt(startPosition, moduleParameters.getMessageCount());
+    return OptionalInt.of(index);
+  }
+
+  private static void snooze() {
+    try {
+      Thread.sleep(2_000);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static <T> T getOnlyElement(Iterable<T> items, T def) {
+    Iterator<T> it = items.iterator();
+    if (!it.hasNext()) {
+      return def;
+    }
+    T item = it.next();
+    if (it.hasNext()) {
+      throw new IllegalStateException("Iterable has additional elements");
+    }
+    return item;
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandGenerator.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandGenerator.java
new file mode 100644
index 0000000..a062de2
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandGenerator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import static java.util.Arrays.asList;
+import static org.apache.commons.math3.util.Pair.create;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Supplier;
+import org.apache.commons.math3.distribution.EnumeratedDistribution;
+import org.apache.commons.math3.random.RandomGenerator;
+import org.apache.commons.math3.util.Pair;
+import org.apache.flink.statefun.e2e.smoke.generated.Command;
+import org.apache.flink.statefun.e2e.smoke.generated.Commands;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+
+/**
+ * Generates random commands to be interpreted by {@linkplain CommandInterpreter}.
+ *
+ * <p>see {src/main/protobuf/commands.proto}
+ */
+public final class CommandGenerator implements Supplier<SourceCommand> {
+
+  private final RandomGenerator random;
+  private final EnumeratedDistribution<Gen> distribution;
+  private final ModuleParameters moduleParameters;
+
+  public CommandGenerator(RandomGenerator random, ModuleParameters parameters) {
+    this.random = Objects.requireNonNull(random);
+    this.moduleParameters = Objects.requireNonNull(parameters);
+    this.distribution = new EnumeratedDistribution<>(random, randomCommandGenerators());
+  }
+
+  @Override
+  public SourceCommand get() {
+    final int depth = random.nextInt(moduleParameters.getCommandDepth());
+    return SourceCommand.newBuilder().setTarget(address()).setCommands(commands(depth)).build();
+  }
+
+  private Commands.Builder commands(int depth) {
+    Commands.Builder builder = Commands.newBuilder();
+    if (depth <= 0) {
+      StateModifyGen.instance().generate(builder, depth);
+      return builder;
+    }
+    final int n = random.nextInt(moduleParameters.getMaxCommandsPerDepth());
+    for (int i = 0; i < n; i++) {
+      Gen gen = distribution.sample();
+      gen.generate(builder, depth);
+    }
+    if (builder.getCommandCount() == 0) {
+      StateModifyGen.instance().generate(builder, depth);
+    }
+    return builder;
+  }
+
+  private int address() {
+    return random.nextInt(moduleParameters.getNumberOfFunctionInstances());
+  }
+
+  private List<Pair<Gen, Double>> randomCommandGenerators() {
+    return asList(
+        create(new StateModifyGen(), moduleParameters.getStateModificationsPr()),
+        create(new SendGen(), moduleParameters.getSendPr()),
+        create(new SendAfterGen(), moduleParameters.getSendAfterPr()),
+        create(new SendAsyncOp(), moduleParameters.getAsyncSendPr()),
+        create(new Noop(), moduleParameters.getNoopPr()),
+        create(new SendEgress(), moduleParameters.getSendEgressPr()));
+  }
+
+  interface Gen {
+    /** generates one or more commands with depth at most @depth. */
+    void generate(Commands.Builder builder, int depth);
+  }
+
+  // ----------------------------------------------------------------------------------------------------
+  // generators
+  // ----------------------------------------------------------------------------------------------------
+
+  private static final class SendEgress implements Gen {
+
+    @Override
+    public void generate(Commands.Builder builder, int depth) {
+      builder.addCommand(
+          Command.newBuilder().setSendEgress(Command.SendEgress.getDefaultInstance()));
+    }
+  }
+
+  private static final class Noop implements Gen {
+    @Override
+    public void generate(Commands.Builder builder, int depth) {}
+  }
+
+  private static final class StateModifyGen implements Gen {
+
+    static final Gen INSTANCE = new StateModifyGen();
+
+    static Gen instance() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void generate(Commands.Builder builder, int depth) {
+      builder.addCommand(
+          Command.newBuilder().setIncrement(Command.IncrementState.getDefaultInstance()));
+    }
+  }
+
+  private final class SendAfterGen implements Gen {
+
+    @Override
+    public void generate(Commands.Builder builder, int depth) {
+      builder.addCommand(Command.newBuilder().setSendAfter(sendAfter(depth)));
+    }
+
+    private Command.SendAfter.Builder sendAfter(int depth) {
+      return Command.SendAfter.newBuilder().setTarget(address()).setCommands(commands(depth - 1));
+    }
+  }
+
+  private final class SendGen implements Gen {
+
+    @Override
+    public void generate(Commands.Builder builder, int depth) {
+      builder.addCommand(Command.newBuilder().setSend(send(depth)));
+    }
+
+    private Command.Send.Builder send(int depth) {
+      return Command.Send.newBuilder().setTarget(address()).setCommands(commands(depth - 1));
+    }
+  }
+
+  private final class SendAsyncOp implements Gen {
+
+    @Override
+    public void generate(Commands.Builder builder, int depth) {
+      builder.addCommand(Command.newBuilder().setAsyncOperation(asyncOp(depth)));
+    }
+
+    private Command.AsyncOperation.Builder asyncOp(int depth) {
+      return Command.AsyncOperation.newBuilder()
+          .setFailure(random.nextBoolean())
+          .setResolvedCommands(commands(depth - 1));
+    }
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreter.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreter.java
new file mode 100644
index 0000000..343c8f2
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.ProtobufUtils.unpack;
+
+import com.google.protobuf.Any;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.e2e.smoke.generated.Command;
+import org.apache.flink.statefun.e2e.smoke.generated.Commands;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
+import org.apache.flink.statefun.sdk.AsyncOperationResult;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+public final class CommandInterpreter {
+  private final AsyncCompleter asyncCompleter;
+  private final Ids ids;
+  private static final Duration sendAfterDelay = Duration.ofMillis(1);
+
+  public CommandInterpreter(Ids ids) {
+    this.asyncCompleter = new AsyncCompleter();
+    asyncCompleter.start();
+    this.ids = Objects.requireNonNull(ids);
+  }
+
+  public void interpret(PersistedValue<Long> state, Context context, Object message) {
+    if (message instanceof AsyncOperationResult) {
+      @SuppressWarnings("unchecked")
+      AsyncOperationResult<Commands, ?> res = (AsyncOperationResult<Commands, ?>) message;
+      interpret(state, context, res.metadata());
+      return;
+    }
+    if (!(message instanceof Any)) {
+      throw new IllegalArgumentException("wtf " + message);
+    }
+    Any any = (Any) message;
+    if (any.is(SourceCommand.class)) {
+      SourceCommand sourceCommand = unpack(any, SourceCommand.class);
+      interpret(state, context, sourceCommand.getCommands());
+    } else if (any.is(Commands.class)) {
+      Commands commands = unpack(any, Commands.class);
+      interpret(state, context, commands);
+    } else {
+      throw new IllegalArgumentException("Unknown message type " + any.getTypeUrl());
+    }
+  }
+
+  private void interpret(PersistedValue<Long> state, Context context, Commands command) {
+    for (Command cmd : command.getCommandList()) {
+      if (cmd.hasIncrement()) {
+        modifyState(state, context, cmd.getIncrement());
+      } else if (cmd.hasAsyncOperation()) {
+        registerAsyncOps(state, context, cmd.getAsyncOperation());
+      } else if (cmd.hasSend()) {
+        send(state, context, cmd.getSend());
+      } else if (cmd.hasSendAfter()) {
+        sendAfter(state, context, cmd.getSendAfter());
+      } else if (cmd.hasSendEgress()) {
+        sendEgress(state, context, cmd.getSendEgress());
+      } else if (cmd.hasVerify()) {
+        verify(state, context, cmd.getVerify());
+      }
+    }
+  }
+
+  private void verify(
+      PersistedValue<Long> state,
+      @SuppressWarnings("unused") Context context,
+      Command.Verify verify) {
+    int selfId = Integer.parseInt(context.self().id());
+    long actual = state.getOrDefault(0L);
+    long expected = verify.getExpected();
+    VerificationResult verificationResult =
+        VerificationResult.newBuilder()
+            .setId(selfId)
+            .setActual(actual)
+            .setExpected(expected)
+            .build();
+    context.send(Constants.VERIFICATION_RESULT, Any.pack(verificationResult));
+  }
+
+  private void sendEgress(
+      @SuppressWarnings("unused") PersistedValue<Long> state,
+      Context context,
+      @SuppressWarnings("unused") Command.SendEgress sendEgress) {
+    context.send(Constants.OUT, Any.getDefaultInstance());
+  }
+
+  private void sendAfter(
+      @SuppressWarnings("unused") PersistedValue<Long> state,
+      Context context,
+      Command.SendAfter send) {
+    FunctionType functionType = Constants.FN_TYPE;
+    String id = ids.idOf(send.getTarget());
+    context.sendAfter(sendAfterDelay, functionType, id, Any.pack(send.getCommands()));
+  }
+
+  private void send(
+      @SuppressWarnings("unused") PersistedValue<Long> state, Context context, Command.Send send) {
+    FunctionType functionType = Constants.FN_TYPE;
+    String id = ids.idOf(send.getTarget());
+    context.send(functionType, id, Any.pack(send.getCommands()));
+  }
+
+  private void registerAsyncOps(
+      @SuppressWarnings("unused") PersistedValue<Long> state,
+      Context context,
+      Command.AsyncOperation asyncOperation) {
+    CompletableFuture<Boolean> future =
+        asyncOperation.getFailure()
+            ? asyncCompleter.successfulFuture()
+            : asyncCompleter.failedFuture();
+
+    Commands next = asyncOperation.getResolvedCommands();
+    context.registerAsyncOperation(next, future);
+  }
+
+  private void modifyState(
+      PersistedValue<Long> state,
+      @SuppressWarnings("unused") Context context,
+      @SuppressWarnings("unused") Command.IncrementState incrementState) {
+    state.updateAndGet(n -> n == null ? 1 : n + 1);
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandRouter.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandRouter.java
new file mode 100644
index 0000000..e08ae8d
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandRouter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import com.google.protobuf.Any;
+import java.util.Objects;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.Router;
+
+public class CommandRouter implements Router<Any> {
+  private final Ids ids;
+
+  public CommandRouter(Ids ids) {
+    this.ids = Objects.requireNonNull(ids);
+  }
+
+  @Override
+  public void route(Any any, Downstream<Any> downstream) {
+    SourceCommand sourceCommand = ProtobufUtils.unpack(any, SourceCommand.class);
+    FunctionType type = Constants.FN_TYPE;
+    String id = ids.idOf(sourceCommand.getTarget());
+    downstream.forward(type, id, any);
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Constants.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Constants.java
new file mode 100644
index 0000000..f5cf262
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Constants.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import com.google.protobuf.Any;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+
+public class Constants {
+
+  public static final IngressIdentifier<Any> IN = new IngressIdentifier<>(Any.class, "", "source");
+
+  public static final EgressIdentifier<Any> OUT = new EgressIdentifier<>("", "sink", Any.class);
+
+  public static final FunctionType FN_TYPE = new FunctionType("v", "f1");
+
+  public static final EgressIdentifier<Any> VERIFICATION_RESULT =
+      new EgressIdentifier<>("", "verification", Any.class);
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Fn.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Fn.java
new file mode 100644
index 0000000..369e966
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Fn.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+public class Fn implements StatefulFunction {
+
+  @Persisted private final PersistedValue<Long> state = PersistedValue.of("state", Long.class);
+  private final CommandInterpreter interpreter;
+
+  public Fn(CommandInterpreter interpreter) {
+    this.interpreter = Objects.requireNonNull(interpreter);
+  }
+
+  @Override
+  public void invoke(Context context, Object message) {
+    interpreter.interpret(state, context, message);
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionProvider.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionProvider.java
new file mode 100644
index 0000000..a7eda2a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+
+public class FunctionProvider implements StatefulFunctionProvider {
+  private final Ids ids;
+
+  public FunctionProvider(Ids ids) {
+    this.ids = Objects.requireNonNull(ids);
+  }
+
+  @Override
+  public StatefulFunction functionOfType(FunctionType functionType) {
+    CommandInterpreter interpreter = new CommandInterpreter(ids);
+    return new Fn(interpreter);
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTracker.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTracker.java
new file mode 100644
index 0000000..d836094
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTracker.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import org.apache.flink.statefun.e2e.smoke.generated.Command;
+import org.apache.flink.statefun.e2e.smoke.generated.Commands;
+import org.apache.flink.statefun.e2e.smoke.generated.FunctionTrackerSnapshot;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+
+final class FunctionStateTracker {
+  private final long[] expectedStates;
+
+  public FunctionStateTracker(int numberOfFunctionInstances) {
+    this.expectedStates = new long[numberOfFunctionInstances];
+  }
+
+  /**
+   * Find any state modification commands nested under @sourceCommand, and apply them in the
+   * internal state representation.
+   */
+  public void apply(SourceCommand sourceCommand) {
+    updateInternally(sourceCommand.getTarget(), sourceCommand.getCommands());
+  }
+
+  /** Apply all the state modification stored in the snapshot represented by the snapshotBytes. */
+  public FunctionStateTracker apply(FunctionTrackerSnapshot snapshot) {
+    for (int i = 0; i < snapshot.getStateCount(); i++) {
+      expectedStates[i] += snapshot.getState(i);
+    }
+    return this;
+  }
+
+  /** Get the current expected state of a function instance. */
+  public long stateOf(int id) {
+    return expectedStates[id];
+  }
+
+  public FunctionTrackerSnapshot.Builder snapshot() {
+    FunctionTrackerSnapshot.Builder snapshot = FunctionTrackerSnapshot.newBuilder();
+    for (long state : expectedStates) {
+      snapshot.addState(state);
+    }
+    return snapshot;
+  }
+
+  /**
+   * Recursively traverse the commands tree and look for {@link Command.IncrementState} commands.
+   * For each {@code ModifyState} command found update the corresponding expected state.
+   */
+  private void updateInternally(int currentAddress, Commands commands) {
+    for (Command command : commands.getCommandList()) {
+      if (command.hasIncrement()) {
+        expectedStates[currentAddress]++;
+      } else if (command.hasSend()) {
+        updateInternally(command.getSend().getTarget(), command.getSend().getCommands());
+      } else if (command.hasSendAfter()) {
+        updateInternally(command.getSendAfter().getTarget(), command.getSendAfter().getCommands());
+      } else if (command.hasAsyncOperation()) {
+        updateInternally(currentAddress, command.getAsyncOperation().getResolvedCommands());
+      }
+    }
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Ids.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Ids.java
new file mode 100644
index 0000000..b9fbc9f
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Ids.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+final class Ids {
+  private final String[] cache;
+
+  public Ids(int maxIds) {
+    this.cache = createIds(maxIds);
+  }
+
+  public String idOf(int address) {
+    return cache[address];
+  }
+
+  private static String[] createIds(int maxIds) {
+    String[] ids = new String[maxIds];
+    for (int i = 0; i < maxIds; i++) {
+      ids[i] = Integer.toString(i);
+    }
+    return ids;
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Module.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Module.java
new file mode 100644
index 0000000..21db25b
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Module.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.Constants.IN;
+
+import com.google.auto.service.AutoService;
+import com.google.protobuf.Any;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec;
+import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@AutoService(StatefulFunctionModule.class)
+public class Module implements StatefulFunctionModule {
+  public static final Logger LOG = LoggerFactory.getLogger(Module.class);
+
+  @Override
+  public void configure(Map<String, String> globalConfiguration, Binder binder) {
+    ModuleParameters moduleParameters = ModuleParameters.from(globalConfiguration);
+    LOG.info(moduleParameters.toString());
+
+    Ids ids = new Ids(moduleParameters.getNumberOfFunctionInstances());
+
+    binder.bindIngress(new SourceFunctionSpec<>(IN, new CommandFlinkSource(moduleParameters)));
+    binder.bindEgress(new SinkFunctionSpec<>(Constants.OUT, new DiscardingSink<>()));
+    binder.bindIngressRouter(IN, new CommandRouter(ids));
+
+    FunctionProvider provider = new FunctionProvider(ids);
+    binder.bindFunctionProvider(Constants.FN_TYPE, provider);
+
+    SocketClientSink<Any> client =
+        new SocketClientSink<>(
+            moduleParameters.getVerificationServerHost(),
+            moduleParameters.getVerificationServerPort(),
+            new VerificationResultSerializer(),
+            3,
+            true);
+
+    binder.bindEgress(new SinkFunctionSpec<>(Constants.VERIFICATION_RESULT, client));
+  }
+
+  private static final class VerificationResultSerializer implements SerializationSchema<Any> {
+
+    @Override
+    public byte[] serialize(Any element) {
+      try {
+        ByteArrayOutputStream out = new ByteArrayOutputStream(element.getSerializedSize() + 8);
+        element.writeDelimitedTo(out);
+        return out.toByteArray();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ModuleParameters.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ModuleParameters.java
new file mode 100644
index 0000000..0d2126e
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ModuleParameters.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+@SuppressWarnings("unused")
+public final class ModuleParameters implements Serializable {
+
+  private static final long serialVersionUID = 1;
+
+  private int numberOfFunctionInstances = 1_000;
+  private int commandDepth = 10;
+  private int messageCount = 100_000;
+  private int maxCommandsPerDepth = 3;
+  private double stateModificationsPr = 0.4;
+  private double sendPr = 0.9;
+  private double sendAfterPr = 0.1;
+  private double asyncSendPr = 0.1;
+  private double noopPr = 0.2;
+  private double sendEgressPr = 0.03;
+  private int maxFailures = 1;
+  private String verificationServerHost = "localhost";
+  private int verificationServerPort = 5050;
+
+  /** Creates an instance of ModuleParameters from a key-value map. */
+  public static ModuleParameters from(Map<String, String> globalConfiguration) {
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    return mapper.convertValue(globalConfiguration, ModuleParameters.class);
+  }
+
+  public Map<String, String> asMap() {
+    ObjectMapper mapper = new ObjectMapper();
+    return mapper.convertValue(this, new TypeReference<Map<String, String>>() {});
+  }
+
+  public int getNumberOfFunctionInstances() {
+    return numberOfFunctionInstances;
+  }
+
+  public void setNumberOfFunctionInstances(int numberOfFunctionInstances) {
+    this.numberOfFunctionInstances = numberOfFunctionInstances;
+  }
+
+  public int getCommandDepth() {
+    return commandDepth;
+  }
+
+  public void setCommandDepth(int commandDepth) {
+    this.commandDepth = commandDepth;
+  }
+
+  public int getMessageCount() {
+    return messageCount;
+  }
+
+  public void setMessageCount(int messageCount) {
+    this.messageCount = messageCount;
+  }
+
+  public int getMaxCommandsPerDepth() {
+    return maxCommandsPerDepth;
+  }
+
+  public void setMaxCommandsPerDepth(int maxCommandsPerDepth) {
+    this.maxCommandsPerDepth = maxCommandsPerDepth;
+  }
+
+  public double getStateModificationsPr() {
+    return stateModificationsPr;
+  }
+
+  public void setStateModificationsPr(double stateModificationsPr) {
+    this.stateModificationsPr = stateModificationsPr;
+  }
+
+  public double getSendPr() {
+    return sendPr;
+  }
+
+  public void setSendPr(double sendPr) {
+    this.sendPr = sendPr;
+  }
+
+  public double getSendAfterPr() {
+    return sendAfterPr;
+  }
+
+  public void setSendAfterPr(double sendAfterPr) {
+    this.sendAfterPr = sendAfterPr;
+  }
+
+  public double getAsyncSendPr() {
+    return asyncSendPr;
+  }
+
+  public void setAsyncSendPr(double asyncSendPr) {
+    this.asyncSendPr = asyncSendPr;
+  }
+
+  public double getNoopPr() {
+    return noopPr;
+  }
+
+  public void setNoopPr(double noopPr) {
+    this.noopPr = noopPr;
+  }
+
+  public double getSendEgressPr() {
+    return sendEgressPr;
+  }
+
+  public void setSendEgressPr(double sendEgressPr) {
+    this.sendEgressPr = sendEgressPr;
+  }
+
+  public void setMaxFailures(int maxFailures) {
+    this.maxFailures = maxFailures;
+  }
+
+  public int getMaxFailures() {
+    return maxFailures;
+  }
+
+  public String getVerificationServerHost() {
+    return verificationServerHost;
+  }
+
+  public void setVerificationServerHost(String verificationServerHost) {
+    this.verificationServerHost = verificationServerHost;
+  }
+
+  public int getVerificationServerPort() {
+    return verificationServerPort;
+  }
+
+  public void setVerificationServerPort(int verificationServerPort) {
+    this.verificationServerPort = verificationServerPort;
+  }
+
+  @Override
+  public String toString() {
+    return "ModuleParameters{"
+        + "numberOfFunctionInstances="
+        + numberOfFunctionInstances
+        + ", commandDepth="
+        + commandDepth
+        + ", messageCount="
+        + messageCount
+        + ", maxCommandsPerDepth="
+        + maxCommandsPerDepth
+        + ", stateModificationsPr="
+        + stateModificationsPr
+        + ", sendPr="
+        + sendPr
+        + ", sendAfterPr="
+        + sendAfterPr
+        + ", asyncSendPr="
+        + asyncSendPr
+        + ", noopPr="
+        + noopPr
+        + ", sendEgressPr="
+        + sendEgressPr
+        + ", maxFailures="
+        + maxFailures
+        + ", verificationServerHost='"
+        + verificationServerHost
+        + '\''
+        + ", verificationServerPort="
+        + verificationServerPort
+        + '}';
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ProtobufUtils.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ProtobufUtils.java
new file mode 100644
index 0000000..25aec2a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ProtobufUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+final class ProtobufUtils {
+  private ProtobufUtils() {}
+
+  public static <T extends Message> T unpack(Any any, Class<T> messageType) {
+    try {
+      return any.unpack(messageType);
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/commands.proto b/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/commands.proto
new file mode 100644
index 0000000..e3912d1
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/commands.proto
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.e2e.smoke;
+option java_package = "org.apache.flink.statefun.e2e.smoke.generated";
+
+option java_multiple_files = true;
+
+message SourceCommand {
+  int32 target = 1;
+  Commands commands = 2;
+}
+
+message Commands {
+  repeated Command command = 1;
+}
+
+message Command {
+  message IncrementState {
+  }
+  message Send {
+    int32 target = 1;
+    Commands commands = 2;
+  }
+  message SendAfter {
+    int32 target = 1;
+    Commands commands = 2;
+  }
+  message SendEgress {
+  }
+  message AsyncOperation {
+    bool failure = 1;
+    Commands resolved_commands = 2;
+  }
+  message Verify {
+    int64 expected = 1;
+  }
+
+  oneof command {
+    IncrementState increment = 1;
+    Send send = 2;
+    SendAfter send_after = 3;
+    SendEgress send_egress = 4;
+    AsyncOperation async_operation = 5;
+    Verify verify = 6;
+  }
+}
+
+message VerificationResult {
+  int32 id = 1;
+  int64 expected = 2;
+  int64 actual = 3;
+}
+
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/internal.proto b/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/internal.proto
new file mode 100644
index 0000000..138fdff
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/internal.proto
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.e2e.smoke;
+option java_package = "org.apache.flink.statefun.e2e.smoke.generated";
+option java_multiple_files = true;
+
+message FunctionTrackerSnapshot {
+  repeated int64 state = 1;
+}
+
+message SourceSnapshot {
+  int32 commandsSentSoFarHandle = 1;
+  int32 failuresGeneratedSoFar = 2;
+  FunctionTrackerSnapshot tracker = 3;
+}
+
+
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandGeneratorTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandGeneratorTest.java
new file mode 100644
index 0000000..49139cc
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandGeneratorTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+import org.junit.Test;
+
+public class CommandGeneratorTest {
+
+  @Test
+  public void usageExample() {
+    ModuleParameters parameters = new ModuleParameters();
+    CommandGenerator generator = new CommandGenerator(new JDKRandomGenerator(), parameters);
+
+    SourceCommand command = generator.get();
+
+    assertThat(command.getTarget(), notNullValue());
+    assertThat(command.getCommands(), notNullValue());
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreterTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreterTest.java
new file mode 100644
index 0000000..1010666
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreterTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.Utils.aStateModificationCommand;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.protobuf.Any;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+import org.junit.Test;
+
+public class CommandInterpreterTest {
+
+  @Test
+  public void exampleUsage() {
+    CommandInterpreter interpreter = new CommandInterpreter(new Ids(10));
+
+    PersistedValue<Long> state = PersistedValue.of("state", Long.class);
+    Context context = new MockContext();
+    SourceCommand sourceCommand = aStateModificationCommand();
+
+    interpreter.interpret(state, context, Any.pack(sourceCommand));
+
+    assertThat(state.get(), is(1L));
+  }
+
+  private static final class MockContext implements Context {
+
+    @Override
+    public Address self() {
+      return null;
+    }
+
+    @Override
+    public Address caller() {
+      return null;
+    }
+
+    @Override
+    public void send(Address address, Object o) {}
+
+    @Override
+    public <T> void send(EgressIdentifier<T> egressIdentifier, T t) {}
+
+    @Override
+    public void sendAfter(Duration duration, Address address, Object o) {}
+
+    @Override
+    public <M, T> void registerAsyncOperation(M m, CompletableFuture<T> completableFuture) {}
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTrackerTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTrackerTest.java
new file mode 100644
index 0000000..97dba1f
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTrackerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.Utils.aRelayedStateModificationCommand;
+import static org.apache.flink.statefun.e2e.smoke.Utils.aStateModificationCommand;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.junit.Test;
+
+public class FunctionStateTrackerTest {
+
+  @Test
+  public void exampleUsage() {
+    FunctionStateTracker tracker = new FunctionStateTracker(1_000);
+
+    tracker.apply(aStateModificationCommand(5));
+    tracker.apply(aStateModificationCommand(5));
+    tracker.apply(aStateModificationCommand(5));
+
+    assertThat(tracker.stateOf(5), is(3L));
+  }
+
+  @Test
+  public void testRelay() {
+    FunctionStateTracker tracker = new FunctionStateTracker(1_000);
+
+    // send a layered state increment message, first to function 5, and then
+    // to function 6.
+    tracker.apply(aRelayedStateModificationCommand(5, 6));
+
+    assertThat(tracker.stateOf(5), is(0L));
+    assertThat(tracker.stateOf(6), is(1L));
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/HarnessTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/HarnessTest.java
new file mode 100644
index 0000000..88864f8
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/HarnessTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.Utils.awaitVerificationSuccess;
+import static org.apache.flink.statefun.e2e.smoke.Utils.startProtobufServer;
+
+import com.google.protobuf.Any;
+import org.apache.flink.statefun.flink.harness.Harness;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HarnessTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HarnessTest.class);
+
+  @Ignore
+  @Test(timeout = 1_000 * 60 * 2)
+  public void miniClusterTest() throws Exception {
+    Harness harness = new Harness();
+
+    // set Flink related configuration.
+    harness.withConfiguration(
+        "classloader.parent-first-patterns.additional",
+        "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
+    harness.withConfiguration("restart-strategy", "fixed-delay");
+    harness.withConfiguration("restart-strategy.fixed-delay.attempts", "2147483647");
+    harness.withConfiguration("restart-strategy.fixed-delay.delay", "1sec");
+    harness.withConfiguration("execution.checkpointing.interval", "2sec");
+    harness.withConfiguration("execution.checkpointing.mode", "EXACTLY_ONCE");
+    harness.withConfiguration("execution.checkpointing.max-concurrent-checkpoints", "3");
+    harness.withConfiguration("parallelism.default", "2");
+    harness.withConfiguration("state.checkpoints.dir", "file:///tmp/checkpoints");
+
+    // start the Protobuf server
+    SimpleProtobufServer.StartedServer<Any> started = startProtobufServer();
+
+    // configure test parameters.
+    ModuleParameters parameters = new ModuleParameters();
+    parameters.setMaxFailures(1);
+    parameters.setMessageCount(100_000);
+    parameters.setNumberOfFunctionInstances(128);
+    parameters.setVerificationServerHost("localhost");
+    parameters.setVerificationServerPort(started.port());
+    parameters.asMap().forEach(harness::withGlobalConfiguration);
+
+    // run the harness.
+    try (AutoCloseable ignored = startHarnessInTheBackground(harness)) {
+      awaitVerificationSuccess(started.results(), parameters.getNumberOfFunctionInstances());
+    }
+
+    LOG.info("All done.");
+  }
+
+  private static AutoCloseable startHarnessInTheBackground(Harness harness) {
+    Thread t =
+        new Thread(
+            () -> {
+              try {
+                harness.start();
+              } catch (InterruptedException ignored) {
+                LOG.info("Harness Thread was interrupted. Exiting...");
+              } catch (Exception exception) {
+                LOG.info("Something happened while trying to run the Harness.", exception);
+              }
+            });
+    t.setName("harness-runner");
+    t.setDaemon(true);
+    t.start();
+    return t::interrupt;
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/ModuleParametersTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/ModuleParametersTest.java
new file mode 100644
index 0000000..be5a9b5
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/ModuleParametersTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+import java.util.Map;
+import org.junit.Test;
+
+public class ModuleParametersTest {
+
+  @Test
+  public void exampleUsage() {
+    Map<String, String> keys = Collections.singletonMap("messageCount", "1");
+    ModuleParameters parameters = ModuleParameters.from(keys);
+
+    assertThat(parameters.getMessageCount(), is(1));
+  }
+
+  @Test
+  public void roundTrip() {
+    ModuleParameters original = new ModuleParameters();
+    original.setCommandDepth(1234);
+
+    ModuleParameters deserialized = ModuleParameters.from(original.asMap());
+
+    assertThat(deserialized.getCommandDepth(), is(1234));
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SimpleProtobufServer.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SimpleProtobufServer.java
new file mode 100644
index 0000000..58e18bf
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SimpleProtobufServer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple threaded TCP server that is able to receive a specific Protocol Buffers message type.
+ *
+ * @param <T> input message type.
+ */
+@ThreadSafe
+public final class SimpleProtobufServer<T extends Message> {
+  private static final Logger LOG = LoggerFactory.getLogger(SimpleProtobufServer.class);
+
+  private final LinkedBlockingDeque<T> results = new LinkedBlockingDeque<>();
+  private final ExecutorService executor;
+  private final AtomicBoolean started = new AtomicBoolean(false);
+  private final Parser<T> parser;
+
+  public SimpleProtobufServer(Parser<T> parser) {
+    this.executor = MoreExecutors.newCachedDaemonThreadPool();
+    this.parser = parser;
+  }
+
+  StartedServer<T> start() {
+    if (!started.compareAndSet(false, true)) {
+      throw new IllegalArgumentException("Already started.");
+    }
+    try {
+      ServerSocket serverSocket = new ServerSocket(0);
+      serverSocket.setReuseAddress(true);
+      LOG.info("Starting server at " + serverSocket.getLocalPort());
+      executor.submit(() -> acceptClients(serverSocket));
+      return new StartedServer<>(serverSocket.getLocalPort(), results());
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to bind the TCP server.", e);
+    }
+  }
+
+  private Supplier<T> results() {
+    return () -> {
+      try {
+        return results.take();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @SuppressWarnings("InfiniteLoopStatement")
+  private void acceptClients(ServerSocket serverSocket) {
+    while (true) {
+      try {
+        Socket client = serverSocket.accept();
+        InputStream input = client.getInputStream();
+        executor.submit(() -> pumpVerificationResults(client, input));
+      } catch (IOException e) {
+        LOG.info("Exception while trying to accept a connection.", e);
+      }
+    }
+  }
+
+  private void pumpVerificationResults(Socket client, InputStream input) {
+    while (true) {
+      try {
+        T result = parser.parseDelimitedFrom(input);
+        if (result != null) {
+          results.add(result);
+        }
+      } catch (IOException e) {
+        LOG.info(
+            "Exception reading a verification result from "
+                + client.getRemoteSocketAddress()
+                + ", bye...",
+            e);
+        IOUtils.closeQuietly(client);
+        return;
+      }
+    }
+  }
+
+  public static final class StartedServer<T extends Message> {
+    private final int port;
+    private final Supplier<T> results;
+
+    public StartedServer(int port, Supplier<T> results) {
+      this.port = port;
+      this.results = results;
+    }
+
+    public int port() {
+      return port;
+    }
+
+    public Supplier<T> results() {
+      return results;
+    }
+  }
+
+  private static final class MoreExecutors {
+
+    static ExecutorService newCachedDaemonThreadPool() {
+      return Executors.newCachedThreadPool(
+          r -> {
+            Thread t = new Thread(r);
+            t.setDaemon(true);
+            return t;
+          });
+    }
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
new file mode 100644
index 0000000..9f2065e
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import static org.apache.flink.statefun.e2e.smoke.Utils.awaitVerificationSuccess;
+import static org.apache.flink.statefun.e2e.smoke.Utils.startProtobufServer;
+
+import com.google.protobuf.Any;
+import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.Testcontainers;
+
+public final class SmokeRunner {
+  private static final Logger LOG = LoggerFactory.getLogger(SmokeRunner.class);
+
+  public static void run(ModuleParameters parameters) throws Throwable {
+    SimpleProtobufServer.StartedServer<Any> server = startProtobufServer();
+    parameters.setVerificationServerHost("host.testcontainers.internal");
+    parameters.setVerificationServerPort(server.port());
+
+    StatefulFunctionsAppContainers.Builder builder =
+        StatefulFunctionsAppContainers.builder("smoke", 2);
+    builder.exposeMasterLogs(LOG);
+
+    // set the test module parameters as global configurations, so that
+    // it can be deserialized at Module#configure()
+    parameters.asMap().forEach(builder::withModuleGlobalConfiguration);
+
+    // run the test
+    Testcontainers.exposeHostPorts(server.port());
+    StatefulFunctionsAppContainers app = builder.build();
+
+    run(
+        app,
+        () ->
+            awaitVerificationSuccess(server.results(), parameters.getNumberOfFunctionInstances()));
+  }
+
+  private static void run(StatefulFunctionsAppContainers app, ThrowingRunnable<Throwable> r)
+      throws Throwable {
+    Statement statement =
+        app.apply(
+            new Statement() {
+              @Override
+              public void evaluate() throws Throwable {
+                r.run();
+              }
+            },
+            Description.EMPTY);
+
+    statement.evaluate();
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeVerificationE2E.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeVerificationE2E.java
new file mode 100644
index 0000000..f1b5b6c
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeVerificationE2E.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke;
+
+import org.junit.Test;
+
+public class SmokeVerificationE2E {
+
+  @Test(timeout = 1_000 * 60 * 10)
+  public void runWith() throws Throwable {
+    ModuleParameters parameters = new ModuleParameters();
+    parameters.setNumberOfFunctionInstances(128);
+    parameters.setMessageCount(100_000);
+    parameters.setMaxFailures(1);
+
+    SmokeRunner.run(parameters);
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/Utils.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/Utils.java
new file mode 100644
index 0000000..85f527d
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/Utils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.e2e.smoke;
+
+import com.google.protobuf.Any;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.e2e.smoke.generated.Command;
+import org.apache.flink.statefun.e2e.smoke.generated.Commands;
+import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
+import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
+
+class Utils {
+
+  public static SourceCommand aStateModificationCommand() {
+    return aStateModificationCommand(-1234); // the id doesn't matter
+  }
+
+  public static SourceCommand aStateModificationCommand(int functionInstanceId) {
+    return SourceCommand.newBuilder()
+        .setTarget(functionInstanceId)
+        .setCommands(Commands.newBuilder().addCommand(modify()))
+        .build();
+  }
+
+  public static SourceCommand aRelayedStateModificationCommand(
+      int firstFunctionId, int secondFunctionId) {
+    return SourceCommand.newBuilder()
+        .setTarget(firstFunctionId)
+        .setCommands(Commands.newBuilder().addCommand(sendTo(secondFunctionId, modify())))
+        .build();
+  }
+
+  private static Command.Builder sendTo(int id, Command.Builder body) {
+    return Command.newBuilder()
+        .setSend(
+            Command.Send.newBuilder()
+                .setTarget(id)
+                .setCommands(Commands.newBuilder().addCommand(body)));
+  }
+
+  private static Command.Builder modify() {
+    return Command.newBuilder().setIncrement(Command.IncrementState.getDefaultInstance());
+  }
+
+  /** Blocks the currently executing thread until enough successful verification results supply. */
+  static void awaitVerificationSuccess(Supplier<Any> results, final int numberOfFunctionInstances) {
+    Set<Integer> successfullyVerified = new HashSet<>();
+    while (successfullyVerified.size() != numberOfFunctionInstances) {
+      Any any = results.get();
+      VerificationResult result = ProtobufUtils.unpack(any, VerificationResult.class);
+      if (result.getActual() == result.getExpected()) {
+        successfullyVerified.add(result.getId());
+      } else if (result.getActual() > result.getExpected()) {
+        throw new AssertionError(
+            "Over counted. Expected: "
+                + result.getExpected()
+                + ", actual: "
+                + result.getActual()
+                + ", function: "
+                + result.getId());
+      }
+    }
+  }
+
+  /** starts a simple Protobuf TCP server that accepts {@link com.google.protobuf.Any}. */
+  static SimpleProtobufServer.StartedServer<Any> startProtobufServer() {
+    SimpleProtobufServer<Any> server = new SimpleProtobufServer<>(Any.parser());
+    return server.start();
+  }
+}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/Dockerfile
new file mode 100644
index 0000000..3166768
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/Dockerfile
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM flink-statefun:2.3-SNAPSHOT
+
+RUN mkdir -p /opt/statefun/modules/statefun-smoke-e2e
+COPY statefun-smoke-e2e*.jar /opt/statefun/modules/statefun-smoke-e2e/
+COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fb965d3
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml
index 694e52d..ea5ce29 100644
--- a/tools/maven/spotbugs-exclude.xml
+++ b/tools/maven/spotbugs-exclude.xml
@@ -101,5 +101,8 @@ under the License.
         <Package name="~org\.apache\.flink\.statefun\.examples\.ridesharing\.simulator.*" />
     </Match>
 
+    <Match>
+        <Package name="~org\.apache\.flink\.statefun\.e2e.smoke.*" />
+    </Match>
 
 </FindBugsFilter>