You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/11 07:56:28 UTC
[flink] 03/03: [FLINK-25266][e2e] Convert StreamingKafkaITCase to SmokeKafkaITCase covering application packaging
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0bc2234b60d1a0635e238d18990695943158123c
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Tue Dec 14 16:03:29 2021 +0100
[FLINK-25266][e2e] Convert StreamingKafkaITCase to SmokeKafkaITCase covering application packaging
---
.../flink/tests/util/kafka/SmokeKafkaITCase.java | 185 ++++++++++++++++++
.../tests/util/kafka/StreamingKafkaITCase.java | 216 ---------------------
.../flink/streaming/kafka/test/KafkaExample.java | 54 +++---
3 files changed, 212 insertions(+), 243 deletions(-)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
new file mode 100644
index 0000000..c1a706f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.connector.kafka.testutils.KafkaUtil;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.JobSubmission;
+import org.apache.flink.tests.util.flink.container.FlinkContainers;
+import org.apache.flink.testutils.junit.FailsOnJava11;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.VoidDeserializer;
+import org.apache.kafka.common.serialization.VoidSerializer;
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
+import static org.apache.flink.util.DockerImageVersions.KAFKA;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** smoke test for the kafka connectors. */
+@Category(value = {FailsOnJava11.class})
+@ExtendWith({TestLoggerExtension.class})
+@Testcontainers
+public class SmokeKafkaITCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SmokeKafkaITCase.class);
+ private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
+ private static final Network NETWORK = Network.newNetwork();
+ private static final String EXAMPLE_JAR_MATCHER = "flink-streaming-kafka-test.*";
+
+ @Container
+ public static final KafkaContainer KAFKA_CONTAINER =
+ createKafkaContainer(KAFKA, LOG)
+ .withEmbeddedZookeeper()
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+
+ @RegisterExtension
+ public static final FlinkContainers FLINK =
+ FlinkContainers.builder()
+ .setConfiguration(getConfiguration())
+ .setLogger(LOG)
+ .dependsOn(KAFKA_CONTAINER)
+ .build();
+
+ private static AdminClient admin;
+ private static KafkaProducer<Void, Integer> producer;
+
+ private static Configuration getConfiguration() {
+ // modify configuration to have enough slots
+ final Configuration flinkConfig = new Configuration();
+ flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+ flinkConfig.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+ return flinkConfig;
+ }
+
+ @BeforeAll
+ private static void setUp() {
+ final Map<String, Object> adminProperties = new HashMap<>();
+ adminProperties.put(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ KAFKA_CONTAINER.getBootstrapServers());
+ admin = AdminClient.create(adminProperties);
+ final Properties producerProperties = new Properties();
+ producerProperties.putAll(adminProperties);
+ producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, VoidSerializer.class);
+ producerProperties.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ producer = new KafkaProducer<>(producerProperties);
+ }
+
+ @AfterAll
+ private static void teardown() {
+ admin.close();
+ producer.close();
+ }
+
+ @Test
+ public void testKafka() throws Exception {
+ final Path kafkaExampleJar = TestUtils.getResource(EXAMPLE_JAR_MATCHER);
+
+ final String inputTopic = "test-input-" + "-" + UUID.randomUUID();
+ final String outputTopic = "test-output" + "-" + UUID.randomUUID();
+
+ // create the required topics
+ final short replicationFactor = 1;
+ admin.createTopics(
+ Lists.newArrayList(
+ new NewTopic(inputTopic, 1, replicationFactor),
+ new NewTopic(outputTopic, 1, replicationFactor)));
+
+ producer.send(new ProducerRecord<>(inputTopic, 1));
+ producer.send(new ProducerRecord<>(inputTopic, 2));
+ producer.send(new ProducerRecord<>(inputTopic, 3));
+
+ // run the Flink job
+ FLINK.submitJob(
+ new JobSubmission.JobSubmissionBuilder(kafkaExampleJar)
+ .setDetached(false)
+ .addArgument("--input-topic", inputTopic)
+ .addArgument("--output-topic", outputTopic)
+ .addArgument("--prefix", "PREFIX")
+ .addArgument(
+ "--bootstrap.servers",
+ String.join(
+ ",",
+ KAFKA_CONTAINER.getBootstrapServers(),
+ KAFKA_CONTAINER.getNetworkAliases().stream()
+ .map(
+ host ->
+ String.join(
+ ":",
+ host,
+ Integer.toString(9092)))
+ .collect(Collectors.joining(","))))
+ .addArgument("--group.id", "myconsumer")
+ .addArgument("--auto.offset.reset", "earliest")
+ .addArgument("--transaction.timeout.ms", "900000")
+ .addArgument("--flink.partition-discovery.interval-millis", "1000")
+ .build());
+ final Properties consumerProperties = new Properties();
+ consumerProperties.put(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ KAFKA_CONTAINER.getBootstrapServers());
+ consumerProperties.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, VoidDeserializer.class);
+ consumerProperties.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ List<Integer> records =
+ KafkaUtil.drainAllRecordsFromTopic(outputTopic, consumerProperties).stream()
+ .map(r -> ByteBuffer.wrap(r.value()).getInt())
+ .collect(Collectors.toList());
+ assertThat(records).hasSize(3).containsExactly(1, 2, 3);
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
deleted file mode 100644
index e8ad197..0000000
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.kafka;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.tests.util.TestUtils;
-import org.apache.flink.tests.util.flink.ClusterController;
-import org.apache.flink.tests.util.flink.FlinkResource;
-import org.apache.flink.tests.util.flink.FlinkResourceSetup;
-import org.apache.flink.tests.util.flink.JobSubmission;
-import org.apache.flink.testutils.junit.FailsOnJava11;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-/** End-to-end test for the kafka connectors. */
-@RunWith(Parameterized.class)
-@Category(value = {FailsOnJava11.class})
-@Ignore("FLINK-25266")
-public class StreamingKafkaITCase extends TestLogger {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamingKafkaITCase.class);
-
- @Parameterized.Parameters(name = "{index}: kafka-version:{1}")
- public static Collection<Object[]> data() {
- return Arrays.asList(new Object[][] {{"flink-streaming-kafka-test.*", "2.4.1"}});
- }
-
- private final Path kafkaExampleJar;
-
- private final String kafkaVersion;
-
- @Rule
- public final Timeout timeout =
- Timeout.builder()
- .withTimeout(3, TimeUnit.MINUTES)
- .withLookingForStuckThread(true)
- .build();
-
- @Rule public final KafkaResource kafka;
-
- @Rule
- public final FlinkResource flink =
- FlinkResource.get(
- FlinkResourceSetup.builder().addConfiguration(getConfiguration()).build());
-
- private static Configuration getConfiguration() {
- // modify configuration to have enough slots
- final Configuration flinkConfig = new Configuration();
- flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
- return flinkConfig;
- }
-
- public StreamingKafkaITCase(final String kafkaExampleJarPattern, final String kafkaVersion) {
- this.kafkaExampleJar = TestUtils.getResource(kafkaExampleJarPattern);
- this.kafka = KafkaResource.get(kafkaVersion);
- this.kafkaVersion = kafkaVersion;
- }
-
- @Test
- public void testKafka() throws Exception {
- try (final ClusterController clusterController = flink.startCluster(1)) {
-
- final String inputTopic =
- "test-input-" + kafkaVersion + "-" + UUID.randomUUID().toString();
- final String outputTopic =
- "test-output" + kafkaVersion + "-" + UUID.randomUUID().toString();
-
- // create the required topics
- kafka.createTopic(1, 1, inputTopic);
- kafka.createTopic(1, 1, outputTopic);
-
- // run the Flink job (detached mode)
- clusterController.submitJob(
- new JobSubmission.JobSubmissionBuilder(kafkaExampleJar)
- .setDetached(true)
- .addArgument("--input-topic", inputTopic)
- .addArgument("--output-topic", outputTopic)
- .addArgument("--prefix", "PREFIX")
- .addArgument(
- "--bootstrap.servers",
- kafka.getBootstrapServerAddresses().stream()
- .map(
- address ->
- address.getHostString()
- + ':'
- + address.getPort())
- .collect(Collectors.joining(",")))
- .addArgument("--group.id", "myconsumer")
- .addArgument("--auto.offset.reset", "earliest")
- .addArgument("--transaction.timeout.ms", "900000")
- .addArgument("--flink.partition-discovery.interval-millis", "1000")
- .build(),
- Duration.ofMinutes(2L));
-
- LOG.info("Sending messages to Kafka topic [{}] ...", inputTopic);
- // send some data to Kafka
- kafka.sendKeyedMessages(
- inputTopic,
- "\t",
- "key\telephant,5,45218",
- "key\tsquirrel,12,46213",
- "key\tbee,3,51348",
- "key\tsquirrel,22,52444",
- "key\tbee,10,53412",
- "key\telephant,9,54867");
-
- LOG.info("Verifying messages from Kafka topic [{}] ...", outputTopic);
- {
- final List<String> messages = kafka.readMessage(6, "kafka-e2e-driver", outputTopic);
-
- final List<String> elephants = filterMessages(messages, "elephant");
- final List<String> squirrels = filterMessages(messages, "squirrel");
- final List<String> bees = filterMessages(messages, "bee");
-
- // check all keys
- Assert.assertEquals(
- Arrays.asList("elephant,5,45218", "elephant,14,54867"), elephants);
- Assert.assertEquals(
- Arrays.asList("squirrel,12,46213", "squirrel,34,52444"), squirrels);
- Assert.assertEquals(Arrays.asList("bee,3,51348", "bee,13,53412"), bees);
- }
-
- // now, we add a new partition to the topic
- LOG.info("Repartitioning Kafka topic [{}] ...", inputTopic);
- kafka.setNumPartitions(2, inputTopic);
- Assert.assertEquals(
- "Failed adding a partition to input topic.",
- 2,
- kafka.getNumPartitions(inputTopic));
-
- // send some more messages to Kafka
- LOG.info("Sending more messages to Kafka topic [{}] ...", inputTopic);
- kafka.sendKeyedMessages(
- inputTopic,
- "\t",
- "key\telephant,13,64213",
- "key\tgiraffe,9,65555",
- "key\tbee,5,65647",
- "key\tsquirrel,18,66413");
-
- // verify that our assumption that the new partition actually has written messages is
- // correct
- Assert.assertNotEquals(
- "The newly created partition does not have any new messages, and therefore partition discovery cannot be verified.",
- 0L,
- kafka.getPartitionOffset(inputTopic, 1));
-
- LOG.info("Verifying messages from Kafka topic [{}] ...", outputTopic);
- {
- final List<String> messages = kafka.readMessage(4, "kafka-e2e-driver", outputTopic);
-
- final List<String> elephants = filterMessages(messages, "elephant");
- final List<String> squirrels = filterMessages(messages, "squirrel");
- final List<String> bees = filterMessages(messages, "bee");
- final List<String> giraffes = filterMessages(messages, "giraffe");
-
- Assert.assertEquals(
- String.format("Messages from Kafka %s: %s", kafkaVersion, messages),
- Arrays.asList("elephant,27,64213"),
- elephants);
- Assert.assertEquals(
- String.format("Messages from Kafka %s: %s", kafkaVersion, messages),
- Arrays.asList("squirrel,52,66413"),
- squirrels);
- Assert.assertEquals(
- String.format("Messages from Kafka %s: %s", kafkaVersion, messages),
- Arrays.asList("bee,18,65647"),
- bees);
- Assert.assertEquals(
- String.format("Messages from Kafka %s: %s", kafkaVersion, messages),
- Arrays.asList("giraffe,9,65555"),
- giraffes);
- }
- }
- }
-
- private static List<String> filterMessages(final List<String> messages, final String keyword) {
- return messages.stream().filter(msg -> msg.contains(keyword)).collect(Collectors.toList());
- }
-}
diff --git a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
index 01e4819..cb90a1d 100644
--- a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
+++ b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
@@ -17,29 +17,24 @@
package org.apache.flink.streaming.kafka.test;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.kafka.test.base.CustomWatermarkExtractor;
-import org.apache.flink.streaming.kafka.test.base.KafkaEvent;
-import org.apache.flink.streaming.kafka.test.base.KafkaEventSchema;
import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil;
-import org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
/**
- * A simple example that shows how to read from and write to modern Kafka. This will read String
- * messages from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key,
- * and finally perform a rolling addition on each key for which the results are written back to
- * another topic.
- *
- * <p>This example also demonstrates using a watermark assigner to generate per-partition watermarks
- * directly in the Flink Kafka consumer. For demonstration purposes, it is assumed that the String
- * messages are of formatted as a (word,frequency,timestamp) tuple.
+ * A simple application used as smoke test example to forward messages from one topic to another
+ * topic in batch mode.
*
* <p>Example usage: --input-topic test-input --output-topic test-output --bootstrap.servers
* localhost:9092 --group.id myconsumer
@@ -51,19 +46,25 @@ public class KafkaExample extends KafkaExampleUtil {
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
- DataStream<KafkaEvent> input =
- env.addSource(
- new FlinkKafkaConsumer<>(
- parameterTool.getRequired("input-topic"),
- new KafkaEventSchema(),
- parameterTool.getProperties())
- .assignTimestampsAndWatermarks(
- new CustomWatermarkExtractor()))
- .keyBy("word")
- .map(new RollingAdditionMapper());
+ DataStream<Integer> input =
+ env.fromSource(
+ KafkaSource.<Integer>builder()
+ .setBootstrapServers(
+ parameterTool
+ .getProperties()
+ .getProperty(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+ .setBounded(OffsetsInitializer.latest())
+ .setDeserializer(
+ KafkaRecordDeserializationSchema.valueOnly(
+ IntegerDeserializer.class))
+ .setTopics(parameterTool.getRequired("input-topic"))
+ .build(),
+ WatermarkStrategy.noWatermarks(),
+ "kafka-source");
input.sinkTo(
- KafkaSink.<KafkaEvent>builder()
+ KafkaSink.<Integer>builder()
.setBootstrapServers(
parameterTool
.getProperties()
@@ -71,10 +72,9 @@ public class KafkaExample extends KafkaExampleUtil {
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(parameterTool.getRequired("output-topic"))
- .setValueSerializationSchema(new KafkaEventSchema())
+ .setKafkaValueSerializer(IntegerSerializer.class)
.build())
.build());
-
- env.execute("Modern Kafka Example");
+ env.execute("Smoke Kafka Example");
}
}