You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/21 07:02:55 UTC

[flink-statefun] 07/08: [FLINK-17518] [e2e] Remove routable Kafka E2E test

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 2abbff1b0036fa3099cccdb13330e97078bd4d97
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:32:26 2020 +0800

    [FLINK-17518] [e2e] Remove routable Kafka E2E test
    
    This E2E can be removed since it is completely subsumed by the new
    remote module E2E.
    
    This closes #115.
---
 statefun-e2e-tests/pom.xml                         |   1 -
 .../statefun-routable-kafka-e2e/pom.xml            | 102 ---------------
 .../statefun/e2e/routablekafka/Constants.java      |  38 ------
 .../e2e/routablekafka/FnSelfAddressTagger.java     |  56 --------
 .../flink/statefun/e2e/routablekafka/KafkaIO.java  |  58 ---------
 .../RoutableKafkaVerificationModule.java           |  56 --------
 .../protobuf/routable-kafka-verification.proto     |  41 ------
 .../e2e/routablekafka/RoutableKafkaE2E.java        | 143 ---------------------
 .../src/test/resources/Dockerfile                  |  21 ---
 .../src/test/resources/log4j.properties            |  24 ----
 .../routable-kafka-ingress-module/module.yaml      |  41 ------
 11 files changed, 581 deletions(-)

diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 9456ed1..ebb297e 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -31,7 +31,6 @@ under the License.
     <modules>
         <module>statefun-e2e-tests-common</module>
         <module>statefun-sanity-e2e</module>
-        <module>statefun-routable-kafka-e2e</module>
         <module>statefun-remote-module-e2e</module>
         <module>statefun-exactly-once-e2e</module>
     </modules>
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/pom.xml b/statefun-e2e-tests/statefun-routable-kafka-e2e/pom.xml
deleted file mode 100644
index 7d7d4a4..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/pom.xml
+++ /dev/null
@@ -1,102 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xmlns="http://maven.apache.org/POM/4.0.0"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>statefun-e2e-tests</artifactId>
-        <groupId>org.apache.flink</groupId>
-        <version>2.1-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>statefun-routable-kafka-e2e</artifactId>
-
-    <properties>
-        <testcontainers.version>1.12.5</testcontainers.version>
-    </properties>
-
-    <dependencies>
-        <!-- Stateful Functions -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-sdk</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-kafka-io</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <!-- Protobuf -->
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>${protobuf.version}</version>
-        </dependency>
-
-        <!-- logging -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.15</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-            <version>1.2.17</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- End-to-end test common -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-e2e-tests-common</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- Testcontainers KafkaContainer -->
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>kafka</artifactId>
-            <version>${testcontainers.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>com.github.os72</groupId>
-                <artifactId>protoc-jar-maven-plugin</artifactId>
-                <version>${protoc-jar-maven-plugin.version}</version>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-javadoc-plugin</artifactId>
-                <configuration>
-                    <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java
deleted file mode 100644
index 6e3c07e..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-
-final class Constants {
-
-  private Constants() {}
-
-  static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
-
-  static final EgressIdentifier<MessageWithAddress> EGRESS_ID =
-      new EgressIdentifier<>(
-          "org.apache.flink.e2e.routablekafka", "tagged-messages", MessageWithAddress.class);
-
-  static final String FUNCTION_NAMESPACE = "org.apache.flink.e2e.routablekafka";
-  static final FunctionType FUNCTION_TYPE_ONE = new FunctionType(FUNCTION_NAMESPACE, "t0");
-  static final FunctionType FUNCTION_TYPE_TWO = new FunctionType(FUNCTION_NAMESPACE, "t1");
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java
deleted file mode 100644
index bec943e..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import com.google.protobuf.Any;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.FnAddress;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.flink.statefun.sdk.Address;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public final class FnSelfAddressTagger implements StatefulFunction {
-
-  @Override
-  public void invoke(Context context, Object input) {
-    MessageWithAddress message = cast(input);
-    context.send(Constants.EGRESS_ID, tagWithSelfAddress(message, context));
-  }
-
-  private static MessageWithAddress cast(Object input) {
-    Any any = (Any) input;
-    try {
-      return any.unpack(MessageWithAddress.class);
-    } catch (Exception e) {
-      throw new RuntimeException("Unable to unpack input as MessageWithAddress.", e);
-    }
-  }
-
-  private MessageWithAddress tagWithSelfAddress(MessageWithAddress original, Context context) {
-    return original.toBuilder().setFrom(fnAddress(context.self())).build();
-  }
-
-  private FnAddress fnAddress(Address sdkAddress) {
-    return FnAddress.newBuilder()
-        .setNamespace(sdkAddress.type().namespace())
-        .setType(sdkAddress.type().name())
-        .setId(sdkAddress.id())
-        .build();
-  }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java
deleted file mode 100644
index 59e3128..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import java.util.Objects;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.flink.statefun.sdk.io.EgressSpec;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-final class KafkaIO {
-
-  static final String TAGGED_MESSAGES_TOPIC_NAME = "tagged-messages";
-
-  private final String kafkaAddress;
-
-  KafkaIO(String kafkaAddress) {
-    this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
-  }
-
-  EgressSpec<MessageWithAddress> getEgressSpec() {
-    return KafkaEgressBuilder.forIdentifier(Constants.EGRESS_ID)
-        .withKafkaAddress(kafkaAddress)
-        .withSerializer(TaggedMessageKafkaSerializer.class)
-        .build();
-  }
-
-  private static final class TaggedMessageKafkaSerializer
-      implements KafkaEgressSerializer<MessageWithAddress> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public ProducerRecord<byte[], byte[]> serialize(MessageWithAddress taggedMessages) {
-      final byte[] key = taggedMessages.getFrom().getIdBytes().toByteArray();
-      final byte[] value = taggedMessages.toByteArray();
-
-      return new ProducerRecord<>(TAGGED_MESSAGES_TOPIC_NAME, key, value);
-    }
-  }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java
deleted file mode 100644
index d9896c8..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-
-/**
- * This is a a simple application used for testing the routable Kafka ingress.
- *
- * <p>The application reads untagged messages from a Kafka ingress, binds multiple functions which
- * has the sole purpose of tagging of messages with their own addresses (see {@link
- * FnSelfAddressTagger}), and then sending back the tagged messages back to a Kafka egress.
- */
-@AutoService(StatefulFunctionModule.class)
-public class RoutableKafkaVerificationModule implements StatefulFunctionModule {
-
-  @Override
-  public void configure(Map<String, String> globalConfiguration, Binder binder) {
-    String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
-    if (kafkaBootstrapServers == null) {
-      throw new IllegalStateException(
-          "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
-    }
-
-    configureKafkaIO(kafkaBootstrapServers, binder);
-    configureAddressTaggerFunctions(binder);
-  }
-
-  private static void configureKafkaIO(String kafkaAddress, Binder binder) {
-    final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
-    binder.bindEgress(kafkaIO.getEgressSpec());
-  }
-
-  private static void configureAddressTaggerFunctions(Binder binder) {
-    binder.bindFunctionProvider(Constants.FUNCTION_TYPE_ONE, ignored -> new FnSelfAddressTagger());
-    binder.bindFunctionProvider(Constants.FUNCTION_TYPE_TWO, ignored -> new FnSelfAddressTagger());
-  }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto
deleted file mode 100644
index a55186b..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = "proto3";
-
-package org.apache.flink.statefun.e2e.routablekafka;
-option java_package = "org.apache.flink.statefun.e2e.routablekafka.generated";
-option java_multiple_files = false;
-
-/*
- * A command is addressed to a specific target funcction and triggers some action by that target.
- * Commands can be nested to an arbitrary depth.
- */
-message MessageWithAddress {
-    FnAddress from = 1;
-    string message = 2;
-}
-
-/*
- * Target function address of commands.
- */
-message FnAddress {
-    string namespace = 1;
-    string type = 2;
-    string id = 3;
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
deleted file mode 100644
index 3869163..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import java.util.Collections;
-import java.util.Properties;
-import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.FnAddress;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
-
-/**
- * End-to-end test based on the {@link RoutableKafkaVerificationModule} application.
- *
- * <p>This test writes some records to Kafka, with target function id as key (UTF8 String) and
- * {@link MessageWithAddress} messages as value, without the {@code from} field set. The routable
- * Kafka ingress should automatically route them to the correct function instances, which tag the
- * input messages with their own address, and then forwards it back to Kafka (see {@link
- * FnSelfAddressTagger} function). The test verifies that the tagged outputs written back to Kafka
- * are correct.
- */
-public class RoutableKafkaE2E {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RoutableKafkaE2E.class);
-
-  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
-  private static final String KAFKA_HOST = "kafka-broker";
-
-  @Rule
-  public KafkaContainer kafka =
-      new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
-
-  @Rule
-  public StatefulFunctionsAppContainers verificationApp =
-      StatefulFunctionsAppContainers.builder("routable-kafka-verification", 1)
-          .dependsOn(kafka)
-          .exposeMasterLogs(LOG)
-          .withBuildContextFileFromClasspath(
-              "routable-kafka-ingress-module", "/routable-kafka-ingress-module/")
-          .withModuleGlobalConfiguration(
-              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
-          .build();
-
-  @Test(timeout = 60_000L)
-  public void run() {
-    final String kafkaAddress = kafka.getBootstrapServers();
-
-    final Producer<String, MessageWithAddress> messageProducer =
-        kafkaKeyedMessagesProducer(kafkaAddress);
-    final Consumer<String, MessageWithAddress> taggedMessageConsumer =
-        kafkaTaggedMessagesConsumer(kafkaAddress);
-
-    final KafkaIOVerifier<String, MessageWithAddress, String, MessageWithAddress> verifier =
-        new KafkaIOVerifier<>(messageProducer, taggedMessageConsumer);
-
-    assertThat(
-        verifier.sending(
-            producerRecord("messages-1", "key-1", message("foo")),
-            producerRecord("messages-1", "key-2", message("bar")),
-            producerRecord("messages-2", "key-1", message("hello"))),
-        verifier.resultsInOrder(
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t0", "key-1"), "foo")),
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t1", "key-1"), "foo")),
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t0", "key-2"), "bar")),
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t1", "key-2"), "bar")),
-            is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t1", "key-1"), "hello"))));
-  }
-
-  private static Producer<String, MessageWithAddress> kafkaKeyedMessagesProducer(
-      String bootstrapServers) {
-    Properties props = new Properties();
-    props.put("bootstrap.servers", bootstrapServers);
-
-    return new KafkaProducer<>(
-        props, new StringSerializer(), new KafkaProtobufSerializer<>(MessageWithAddress.parser()));
-  }
-
-  private Consumer<String, MessageWithAddress> kafkaTaggedMessagesConsumer(
-      String bootstrapServers) {
-    Properties consumerProps = new Properties();
-    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
-    consumerProps.setProperty("group.id", "routable-kafka");
-    consumerProps.setProperty("auto.offset.reset", "earliest");
-
-    KafkaConsumer<String, MessageWithAddress> consumer =
-        new KafkaConsumer<>(
-            consumerProps,
-            new StringDeserializer(),
-            new KafkaProtobufSerializer<>(MessageWithAddress.parser()));
-    consumer.subscribe(Collections.singletonList(KafkaIO.TAGGED_MESSAGES_TOPIC_NAME));
-
-    return consumer;
-  }
-
-  private static ProducerRecord<String, MessageWithAddress> producerRecord(
-      String topic, String key, MessageWithAddress message) {
-    return new ProducerRecord<>(topic, key, message);
-  }
-
-  private static MessageWithAddress message(String message) {
-    return MessageWithAddress.newBuilder().setMessage(message).build();
-  }
-
-  private static MessageWithAddress taggedMessage(FnAddress fromTag, String message) {
-    return MessageWithAddress.newBuilder().setFrom(fromTag).setMessage(message).build();
-  }
-
-  private static FnAddress fnAddress(String namespace, String type, String id) {
-    return FnAddress.newBuilder().setNamespace(namespace).setType(type).setId(id).build();
-  }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile
deleted file mode 100644
index 7210f89..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM flink-statefun:2.1-SNAPSHOT
-
-RUN mkdir -p /opt/statefun/modules/statefun-routable-kafka-e2e
-COPY statefun-routable-kafka-e2e*.jar /opt/statefun/modules/statefun-routable-kafka-e2e/
-COPY routable-kafka-ingress-module/ /opt/statefun/modules/statefun-routable-kafka-e2e/
-COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/log4j.properties
deleted file mode 100644
index fb965d3..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml
deleted file mode 100644
index 98fee69..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-version: "1.0"
-
-module:
-  meta:
-    type: remote
-  spec:
-    ingresses:
-      - ingress:
-          meta:
-            type: statefun.kafka.io/routable-protobuf-ingress
-            id: org.apache.flink.statefun.e2e/messages
-          spec:
-            address: kafka-broker:9092
-            consumerGroupId: routable-kafka-e2e
-            startupPosition:
-              type: earliest
-            topics:
-              - topic: messages-1
-                typeUrl: com.googleapis/org.apache.flink.statefun.e2e.routablekafka.MessageWithAddress
-                targets:
-                  - org.apache.flink.e2e.routablekafka/t0
-                  - org.apache.flink.e2e.routablekafka/t1
-              - topic: messages-2
-                typeUrl: com.googleapis/org.apache.flink.statefun.e2e.routablekafka.MessageWithAddress
-                targets:
-                  - org.apache.flink.e2e.routablekafka/t1