You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/21 07:02:55 UTC
[flink-statefun] 07/08: [FLINK-17518] [e2e] Remove routable Kafka
E2E test
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 2abbff1b0036fa3099cccdb13330e97078bd4d97
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 20 19:32:26 2020 +0800
[FLINK-17518] [e2e] Remove routable Kafka E2E test
This E2E can be removed since it is completely subsumed by the new
remote module E2E.
This closes #115.
---
statefun-e2e-tests/pom.xml | 1 -
.../statefun-routable-kafka-e2e/pom.xml | 102 ---------------
.../statefun/e2e/routablekafka/Constants.java | 38 ------
.../e2e/routablekafka/FnSelfAddressTagger.java | 56 --------
.../flink/statefun/e2e/routablekafka/KafkaIO.java | 58 ---------
.../RoutableKafkaVerificationModule.java | 56 --------
.../protobuf/routable-kafka-verification.proto | 41 ------
.../e2e/routablekafka/RoutableKafkaE2E.java | 143 ---------------------
.../src/test/resources/Dockerfile | 21 ---
.../src/test/resources/log4j.properties | 24 ----
.../routable-kafka-ingress-module/module.yaml | 41 ------
11 files changed, 581 deletions(-)
diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 9456ed1..ebb297e 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -31,7 +31,6 @@ under the License.
<modules>
<module>statefun-e2e-tests-common</module>
<module>statefun-sanity-e2e</module>
- <module>statefun-routable-kafka-e2e</module>
<module>statefun-remote-module-e2e</module>
<module>statefun-exactly-once-e2e</module>
</modules>
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/pom.xml b/statefun-e2e-tests/statefun-routable-kafka-e2e/pom.xml
deleted file mode 100644
index 7d7d4a4..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/pom.xml
+++ /dev/null
@@ -1,102 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>statefun-e2e-tests</artifactId>
- <groupId>org.apache.flink</groupId>
- <version>2.1-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>statefun-routable-kafka-e2e</artifactId>
-
- <properties>
- <testcontainers.version>1.12.5</testcontainers.version>
- </properties>
-
- <dependencies>
- <!-- Stateful Functions -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>statefun-sdk</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>statefun-kafka-io</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <!-- Protobuf -->
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
- </dependency>
-
- <!-- logging -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.15</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- <scope>test</scope>
- </dependency>
-
- <!-- End-to-end test common -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>statefun-e2e-tests-common</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- Testcontainers KafkaContainer -->
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>kafka</artifactId>
- <version>${testcontainers.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>com.github.os72</groupId>
- <artifactId>protoc-jar-maven-plugin</artifactId>
- <version>${protoc-jar-maven-plugin.version}</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <configuration>
- <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java
deleted file mode 100644
index 6e3c07e..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/Constants.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-
-final class Constants {
-
- private Constants() {}
-
- static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
-
- static final EgressIdentifier<MessageWithAddress> EGRESS_ID =
- new EgressIdentifier<>(
- "org.apache.flink.e2e.routablekafka", "tagged-messages", MessageWithAddress.class);
-
- static final String FUNCTION_NAMESPACE = "org.apache.flink.e2e.routablekafka";
- static final FunctionType FUNCTION_TYPE_ONE = new FunctionType(FUNCTION_NAMESPACE, "t0");
- static final FunctionType FUNCTION_TYPE_TWO = new FunctionType(FUNCTION_NAMESPACE, "t1");
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java
deleted file mode 100644
index bec943e..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/FnSelfAddressTagger.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import com.google.protobuf.Any;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.FnAddress;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.flink.statefun.sdk.Address;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public final class FnSelfAddressTagger implements StatefulFunction {
-
- @Override
- public void invoke(Context context, Object input) {
- MessageWithAddress message = cast(input);
- context.send(Constants.EGRESS_ID, tagWithSelfAddress(message, context));
- }
-
- private static MessageWithAddress cast(Object input) {
- Any any = (Any) input;
- try {
- return any.unpack(MessageWithAddress.class);
- } catch (Exception e) {
- throw new RuntimeException("Unable to unpack input as MessageWithAddress.", e);
- }
- }
-
- private MessageWithAddress tagWithSelfAddress(MessageWithAddress original, Context context) {
- return original.toBuilder().setFrom(fnAddress(context.self())).build();
- }
-
- private FnAddress fnAddress(Address sdkAddress) {
- return FnAddress.newBuilder()
- .setNamespace(sdkAddress.type().namespace())
- .setType(sdkAddress.type().name())
- .setId(sdkAddress.id())
- .build();
- }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java
deleted file mode 100644
index 59e3128..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/KafkaIO.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import java.util.Objects;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.flink.statefun.sdk.io.EgressSpec;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-final class KafkaIO {
-
- static final String TAGGED_MESSAGES_TOPIC_NAME = "tagged-messages";
-
- private final String kafkaAddress;
-
- KafkaIO(String kafkaAddress) {
- this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
- }
-
- EgressSpec<MessageWithAddress> getEgressSpec() {
- return KafkaEgressBuilder.forIdentifier(Constants.EGRESS_ID)
- .withKafkaAddress(kafkaAddress)
- .withSerializer(TaggedMessageKafkaSerializer.class)
- .build();
- }
-
- private static final class TaggedMessageKafkaSerializer
- implements KafkaEgressSerializer<MessageWithAddress> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public ProducerRecord<byte[], byte[]> serialize(MessageWithAddress taggedMessages) {
- final byte[] key = taggedMessages.getFrom().getIdBytes().toByteArray();
- final byte[] value = taggedMessages.toByteArray();
-
- return new ProducerRecord<>(TAGGED_MESSAGES_TOPIC_NAME, key, value);
- }
- }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java
deleted file mode 100644
index d9896c8..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaVerificationModule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-
-/**
- * This is a a simple application used for testing the routable Kafka ingress.
- *
- * <p>The application reads untagged messages from a Kafka ingress, binds multiple functions which
- * has the sole purpose of tagging of messages with their own addresses (see {@link
- * FnSelfAddressTagger}), and then sending back the tagged messages back to a Kafka egress.
- */
-@AutoService(StatefulFunctionModule.class)
-public class RoutableKafkaVerificationModule implements StatefulFunctionModule {
-
- @Override
- public void configure(Map<String, String> globalConfiguration, Binder binder) {
- String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
- if (kafkaBootstrapServers == null) {
- throw new IllegalStateException(
- "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
- }
-
- configureKafkaIO(kafkaBootstrapServers, binder);
- configureAddressTaggerFunctions(binder);
- }
-
- private static void configureKafkaIO(String kafkaAddress, Binder binder) {
- final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
- binder.bindEgress(kafkaIO.getEgressSpec());
- }
-
- private static void configureAddressTaggerFunctions(Binder binder) {
- binder.bindFunctionProvider(Constants.FUNCTION_TYPE_ONE, ignored -> new FnSelfAddressTagger());
- binder.bindFunctionProvider(Constants.FUNCTION_TYPE_TWO, ignored -> new FnSelfAddressTagger());
- }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto
deleted file mode 100644
index a55186b..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/main/protobuf/routable-kafka-verification.proto
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = "proto3";
-
-package org.apache.flink.statefun.e2e.routablekafka;
-option java_package = "org.apache.flink.statefun.e2e.routablekafka.generated";
-option java_multiple_files = false;
-
-/*
- * A command is addressed to a specific target funcction and triggers some action by that target.
- * Commands can be nested to an arbitrary depth.
- */
-message MessageWithAddress {
- FnAddress from = 1;
- string message = 2;
-}
-
-/*
- * Target function address of commands.
- */
-message FnAddress {
- string namespace = 1;
- string type = 2;
- string id = 3;
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
deleted file mode 100644
index 3869163..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.routablekafka;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import java.util.Collections;
-import java.util.Properties;
-import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.FnAddress;
-import org.apache.flink.statefun.e2e.routablekafka.generated.RoutableKafkaVerification.MessageWithAddress;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
-
-/**
- * End-to-end test based on the {@link RoutableKafkaVerificationModule} application.
- *
- * <p>This test writes some records to Kafka, with target function id as key (UTF8 String) and
- * {@link MessageWithAddress} messages as value, without the {@code from} field set. The routable
- * Kafka ingress should automatically route them to the correct function instances, which tag the
- * input messages with their own address, and then forwards it back to Kafka (see {@link
- * FnSelfAddressTagger} function). The test verifies that the tagged outputs written back to Kafka
- * are correct.
- */
-public class RoutableKafkaE2E {
-
- private static final Logger LOG = LoggerFactory.getLogger(RoutableKafkaE2E.class);
-
- private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
- private static final String KAFKA_HOST = "kafka-broker";
-
- @Rule
- public KafkaContainer kafka =
- new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
-
- @Rule
- public StatefulFunctionsAppContainers verificationApp =
- StatefulFunctionsAppContainers.builder("routable-kafka-verification", 1)
- .dependsOn(kafka)
- .exposeMasterLogs(LOG)
- .withBuildContextFileFromClasspath(
- "routable-kafka-ingress-module", "/routable-kafka-ingress-module/")
- .withModuleGlobalConfiguration(
- Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
- .build();
-
- @Test(timeout = 60_000L)
- public void run() {
- final String kafkaAddress = kafka.getBootstrapServers();
-
- final Producer<String, MessageWithAddress> messageProducer =
- kafkaKeyedMessagesProducer(kafkaAddress);
- final Consumer<String, MessageWithAddress> taggedMessageConsumer =
- kafkaTaggedMessagesConsumer(kafkaAddress);
-
- final KafkaIOVerifier<String, MessageWithAddress, String, MessageWithAddress> verifier =
- new KafkaIOVerifier<>(messageProducer, taggedMessageConsumer);
-
- assertThat(
- verifier.sending(
- producerRecord("messages-1", "key-1", message("foo")),
- producerRecord("messages-1", "key-2", message("bar")),
- producerRecord("messages-2", "key-1", message("hello"))),
- verifier.resultsInOrder(
- is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t0", "key-1"), "foo")),
- is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t1", "key-1"), "foo")),
- is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t0", "key-2"), "bar")),
- is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t1", "key-2"), "bar")),
- is(taggedMessage(fnAddress(Constants.FUNCTION_NAMESPACE, "t1", "key-1"), "hello"))));
- }
-
- private static Producer<String, MessageWithAddress> kafkaKeyedMessagesProducer(
- String bootstrapServers) {
- Properties props = new Properties();
- props.put("bootstrap.servers", bootstrapServers);
-
- return new KafkaProducer<>(
- props, new StringSerializer(), new KafkaProtobufSerializer<>(MessageWithAddress.parser()));
- }
-
- private Consumer<String, MessageWithAddress> kafkaTaggedMessagesConsumer(
- String bootstrapServers) {
- Properties consumerProps = new Properties();
- consumerProps.setProperty("bootstrap.servers", bootstrapServers);
- consumerProps.setProperty("group.id", "routable-kafka");
- consumerProps.setProperty("auto.offset.reset", "earliest");
-
- KafkaConsumer<String, MessageWithAddress> consumer =
- new KafkaConsumer<>(
- consumerProps,
- new StringDeserializer(),
- new KafkaProtobufSerializer<>(MessageWithAddress.parser()));
- consumer.subscribe(Collections.singletonList(KafkaIO.TAGGED_MESSAGES_TOPIC_NAME));
-
- return consumer;
- }
-
- private static ProducerRecord<String, MessageWithAddress> producerRecord(
- String topic, String key, MessageWithAddress message) {
- return new ProducerRecord<>(topic, key, message);
- }
-
- private static MessageWithAddress message(String message) {
- return MessageWithAddress.newBuilder().setMessage(message).build();
- }
-
- private static MessageWithAddress taggedMessage(FnAddress fromTag, String message) {
- return MessageWithAddress.newBuilder().setFrom(fromTag).setMessage(message).build();
- }
-
- private static FnAddress fnAddress(String namespace, String type, String id) {
- return FnAddress.newBuilder().setNamespace(namespace).setType(type).setId(id).build();
- }
-}
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile
deleted file mode 100644
index 7210f89..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/Dockerfile
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM flink-statefun:2.1-SNAPSHOT
-
-RUN mkdir -p /opt/statefun/modules/statefun-routable-kafka-e2e
-COPY statefun-routable-kafka-e2e*.jar /opt/statefun/modules/statefun-routable-kafka-e2e/
-COPY routable-kafka-ingress-module/ /opt/statefun/modules/statefun-routable-kafka-e2e/
-COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/log4j.properties
deleted file mode 100644
index fb965d3..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml
deleted file mode 100644
index 98fee69..0000000
--- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/resources/routable-kafka-ingress-module/module.yaml
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-version: "1.0"
-
-module:
- meta:
- type: remote
- spec:
- ingresses:
- - ingress:
- meta:
- type: statefun.kafka.io/routable-protobuf-ingress
- id: org.apache.flink.statefun.e2e/messages
- spec:
- address: kafka-broker:9092
- consumerGroupId: routable-kafka-e2e
- startupPosition:
- type: earliest
- topics:
- - topic: messages-1
- typeUrl: com.googleapis/org.apache.flink.statefun.e2e.routablekafka.MessageWithAddress
- targets:
- - org.apache.flink.e2e.routablekafka/t0
- - org.apache.flink.e2e.routablekafka/t1
- - topic: messages-2
- typeUrl: com.googleapis/org.apache.flink.statefun.e2e.routablekafka.MessageWithAddress
- targets:
- - org.apache.flink.e2e.routablekafka/t1