You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/11 07:56:28 UTC

[flink] 03/03: [FLINK-25266][e2e] Convert StreamingKafkaITCase to SmokeKafkaITCase covering application packaging

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0bc2234b60d1a0635e238d18990695943158123c
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Tue Dec 14 16:03:29 2021 +0100

    [FLINK-25266][e2e] Convert StreamingKafkaITCase to SmokeKafkaITCase covering application packaging
---
 .../flink/tests/util/kafka/SmokeKafkaITCase.java   | 185 ++++++++++++++++++
 .../tests/util/kafka/StreamingKafkaITCase.java     | 216 ---------------------
 .../flink/streaming/kafka/test/KafkaExample.java   |  54 +++---
 3 files changed, 212 insertions(+), 243 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
new file mode 100644
index 0000000..c1a706f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.connector.kafka.testutils.KafkaUtil;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.JobSubmission;
+import org.apache.flink.tests.util.flink.container.FlinkContainers;
+import org.apache.flink.testutils.junit.FailsOnJava11;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.VoidDeserializer;
+import org.apache.kafka.common.serialization.VoidSerializer;
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
+import static org.apache.flink.util.DockerImageVersions.KAFKA;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** smoke test for the kafka connectors. */
+@Category(value = {FailsOnJava11.class})
+@ExtendWith({TestLoggerExtension.class})
+@Testcontainers
+public class SmokeKafkaITCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SmokeKafkaITCase.class);
+    private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
+    private static final Network NETWORK = Network.newNetwork();
+    private static final String EXAMPLE_JAR_MATCHER = "flink-streaming-kafka-test.*";
+
+    @Container
+    public static final KafkaContainer KAFKA_CONTAINER =
+            createKafkaContainer(KAFKA, LOG)
+                    .withEmbeddedZookeeper()
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+
+    @RegisterExtension
+    public static final FlinkContainers FLINK =
+            FlinkContainers.builder()
+                    .setConfiguration(getConfiguration())
+                    .setLogger(LOG)
+                    .dependsOn(KAFKA_CONTAINER)
+                    .build();
+
+    private static AdminClient admin;
+    private static KafkaProducer<Void, Integer> producer;
+
+    private static Configuration getConfiguration() {
+        // modify configuration to have enough slots
+        final Configuration flinkConfig = new Configuration();
+        flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+        flinkConfig.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        return flinkConfig;
+    }
+
+    @BeforeAll
+    private static void setUp() {
+        final Map<String, Object> adminProperties = new HashMap<>();
+        adminProperties.put(
+                CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                KAFKA_CONTAINER.getBootstrapServers());
+        admin = AdminClient.create(adminProperties);
+        final Properties producerProperties = new Properties();
+        producerProperties.putAll(adminProperties);
+        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, VoidSerializer.class);
+        producerProperties.put(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        producer = new KafkaProducer<>(producerProperties);
+    }
+
+    @AfterAll
+    private static void teardown() {
+        admin.close();
+        producer.close();
+    }
+
+    @Test
+    public void testKafka() throws Exception {
+        final Path kafkaExampleJar = TestUtils.getResource(EXAMPLE_JAR_MATCHER);
+
+        final String inputTopic = "test-input-" + "-" + UUID.randomUUID();
+        final String outputTopic = "test-output" + "-" + UUID.randomUUID();
+
+        // create the required topics
+        final short replicationFactor = 1;
+        admin.createTopics(
+                Lists.newArrayList(
+                        new NewTopic(inputTopic, 1, replicationFactor),
+                        new NewTopic(outputTopic, 1, replicationFactor)));
+
+        producer.send(new ProducerRecord<>(inputTopic, 1));
+        producer.send(new ProducerRecord<>(inputTopic, 2));
+        producer.send(new ProducerRecord<>(inputTopic, 3));
+
+        // run the Flink job
+        FLINK.submitJob(
+                new JobSubmission.JobSubmissionBuilder(kafkaExampleJar)
+                        .setDetached(false)
+                        .addArgument("--input-topic", inputTopic)
+                        .addArgument("--output-topic", outputTopic)
+                        .addArgument("--prefix", "PREFIX")
+                        .addArgument(
+                                "--bootstrap.servers",
+                                String.join(
+                                        ",",
+                                        KAFKA_CONTAINER.getBootstrapServers(),
+                                        KAFKA_CONTAINER.getNetworkAliases().stream()
+                                                .map(
+                                                        host ->
+                                                                String.join(
+                                                                        ":",
+                                                                        host,
+                                                                        Integer.toString(9092)))
+                                                .collect(Collectors.joining(","))))
+                        .addArgument("--group.id", "myconsumer")
+                        .addArgument("--auto.offset.reset", "earliest")
+                        .addArgument("--transaction.timeout.ms", "900000")
+                        .addArgument("--flink.partition-discovery.interval-millis", "1000")
+                        .build());
+        final Properties consumerProperties = new Properties();
+        consumerProperties.put(
+                CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                KAFKA_CONTAINER.getBootstrapServers());
+        consumerProperties.put(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, VoidDeserializer.class);
+        consumerProperties.put(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        List<Integer> records =
+                KafkaUtil.drainAllRecordsFromTopic(outputTopic, consumerProperties).stream()
+                        .map(r -> ByteBuffer.wrap(r.value()).getInt())
+                        .collect(Collectors.toList());
+        assertThat(records).hasSize(3).containsExactly(1, 2, 3);
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
deleted file mode 100644
index e8ad197..0000000
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.kafka;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.tests.util.TestUtils;
-import org.apache.flink.tests.util.flink.ClusterController;
-import org.apache.flink.tests.util.flink.FlinkResource;
-import org.apache.flink.tests.util.flink.FlinkResourceSetup;
-import org.apache.flink.tests.util.flink.JobSubmission;
-import org.apache.flink.testutils.junit.FailsOnJava11;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-/** End-to-end test for the kafka connectors. */
-@RunWith(Parameterized.class)
-@Category(value = {FailsOnJava11.class})
-@Ignore("FLINK-25266")
-public class StreamingKafkaITCase extends TestLogger {
-
-    private static final Logger LOG = LoggerFactory.getLogger(StreamingKafkaITCase.class);
-
-    @Parameterized.Parameters(name = "{index}: kafka-version:{1}")
-    public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] {{"flink-streaming-kafka-test.*", "2.4.1"}});
-    }
-
-    private final Path kafkaExampleJar;
-
-    private final String kafkaVersion;
-
-    @Rule
-    public final Timeout timeout =
-            Timeout.builder()
-                    .withTimeout(3, TimeUnit.MINUTES)
-                    .withLookingForStuckThread(true)
-                    .build();
-
-    @Rule public final KafkaResource kafka;
-
-    @Rule
-    public final FlinkResource flink =
-            FlinkResource.get(
-                    FlinkResourceSetup.builder().addConfiguration(getConfiguration()).build());
-
-    private static Configuration getConfiguration() {
-        // modify configuration to have enough slots
-        final Configuration flinkConfig = new Configuration();
-        flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
-        return flinkConfig;
-    }
-
-    public StreamingKafkaITCase(final String kafkaExampleJarPattern, final String kafkaVersion) {
-        this.kafkaExampleJar = TestUtils.getResource(kafkaExampleJarPattern);
-        this.kafka = KafkaResource.get(kafkaVersion);
-        this.kafkaVersion = kafkaVersion;
-    }
-
-    @Test
-    public void testKafka() throws Exception {
-        try (final ClusterController clusterController = flink.startCluster(1)) {
-
-            final String inputTopic =
-                    "test-input-" + kafkaVersion + "-" + UUID.randomUUID().toString();
-            final String outputTopic =
-                    "test-output" + kafkaVersion + "-" + UUID.randomUUID().toString();
-
-            // create the required topics
-            kafka.createTopic(1, 1, inputTopic);
-            kafka.createTopic(1, 1, outputTopic);
-
-            // run the Flink job (detached mode)
-            clusterController.submitJob(
-                    new JobSubmission.JobSubmissionBuilder(kafkaExampleJar)
-                            .setDetached(true)
-                            .addArgument("--input-topic", inputTopic)
-                            .addArgument("--output-topic", outputTopic)
-                            .addArgument("--prefix", "PREFIX")
-                            .addArgument(
-                                    "--bootstrap.servers",
-                                    kafka.getBootstrapServerAddresses().stream()
-                                            .map(
-                                                    address ->
-                                                            address.getHostString()
-                                                                    + ':'
-                                                                    + address.getPort())
-                                            .collect(Collectors.joining(",")))
-                            .addArgument("--group.id", "myconsumer")
-                            .addArgument("--auto.offset.reset", "earliest")
-                            .addArgument("--transaction.timeout.ms", "900000")
-                            .addArgument("--flink.partition-discovery.interval-millis", "1000")
-                            .build(),
-                    Duration.ofMinutes(2L));
-
-            LOG.info("Sending messages to Kafka topic [{}] ...", inputTopic);
-            // send some data to Kafka
-            kafka.sendKeyedMessages(
-                    inputTopic,
-                    "\t",
-                    "key\telephant,5,45218",
-                    "key\tsquirrel,12,46213",
-                    "key\tbee,3,51348",
-                    "key\tsquirrel,22,52444",
-                    "key\tbee,10,53412",
-                    "key\telephant,9,54867");
-
-            LOG.info("Verifying messages from Kafka topic [{}] ...", outputTopic);
-            {
-                final List<String> messages = kafka.readMessage(6, "kafka-e2e-driver", outputTopic);
-
-                final List<String> elephants = filterMessages(messages, "elephant");
-                final List<String> squirrels = filterMessages(messages, "squirrel");
-                final List<String> bees = filterMessages(messages, "bee");
-
-                // check all keys
-                Assert.assertEquals(
-                        Arrays.asList("elephant,5,45218", "elephant,14,54867"), elephants);
-                Assert.assertEquals(
-                        Arrays.asList("squirrel,12,46213", "squirrel,34,52444"), squirrels);
-                Assert.assertEquals(Arrays.asList("bee,3,51348", "bee,13,53412"), bees);
-            }
-
-            // now, we add a new partition to the topic
-            LOG.info("Repartitioning Kafka topic [{}] ...", inputTopic);
-            kafka.setNumPartitions(2, inputTopic);
-            Assert.assertEquals(
-                    "Failed adding a partition to input topic.",
-                    2,
-                    kafka.getNumPartitions(inputTopic));
-
-            // send some more messages to Kafka
-            LOG.info("Sending more messages to Kafka topic [{}] ...", inputTopic);
-            kafka.sendKeyedMessages(
-                    inputTopic,
-                    "\t",
-                    "key\telephant,13,64213",
-                    "key\tgiraffe,9,65555",
-                    "key\tbee,5,65647",
-                    "key\tsquirrel,18,66413");
-
-            // verify that our assumption that the new partition actually has written messages is
-            // correct
-            Assert.assertNotEquals(
-                    "The newly created partition does not have any new messages, and therefore partition discovery cannot be verified.",
-                    0L,
-                    kafka.getPartitionOffset(inputTopic, 1));
-
-            LOG.info("Verifying messages from Kafka topic [{}] ...", outputTopic);
-            {
-                final List<String> messages = kafka.readMessage(4, "kafka-e2e-driver", outputTopic);
-
-                final List<String> elephants = filterMessages(messages, "elephant");
-                final List<String> squirrels = filterMessages(messages, "squirrel");
-                final List<String> bees = filterMessages(messages, "bee");
-                final List<String> giraffes = filterMessages(messages, "giraffe");
-
-                Assert.assertEquals(
-                        String.format("Messages from Kafka %s: %s", kafkaVersion, messages),
-                        Arrays.asList("elephant,27,64213"),
-                        elephants);
-                Assert.assertEquals(
-                        String.format("Messages from Kafka %s: %s", kafkaVersion, messages),
-                        Arrays.asList("squirrel,52,66413"),
-                        squirrels);
-                Assert.assertEquals(
-                        String.format("Messages from Kafka %s: %s", kafkaVersion, messages),
-                        Arrays.asList("bee,18,65647"),
-                        bees);
-                Assert.assertEquals(
-                        String.format("Messages from Kafka %s: %s", kafkaVersion, messages),
-                        Arrays.asList("giraffe,9,65555"),
-                        giraffes);
-            }
-        }
-    }
-
-    private static List<String> filterMessages(final List<String> messages, final String keyword) {
-        return messages.stream().filter(msg -> msg.contains(keyword)).collect(Collectors.toList());
-    }
-}
diff --git a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
index 01e4819..cb90a1d 100644
--- a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
+++ b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
@@ -17,29 +17,24 @@
 
 package org.apache.flink.streaming.kafka.test;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.kafka.test.base.CustomWatermarkExtractor;
-import org.apache.flink.streaming.kafka.test.base.KafkaEvent;
-import org.apache.flink.streaming.kafka.test.base.KafkaEventSchema;
 import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil;
-import org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 
 /**
- * A simple example that shows how to read from and write to modern Kafka. This will read String
- * messages from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key,
- * and finally perform a rolling addition on each key for which the results are written back to
- * another topic.
- *
- * <p>This example also demonstrates using a watermark assigner to generate per-partition watermarks
- * directly in the Flink Kafka consumer. For demonstration purposes, it is assumed that the String
- * messages are of formatted as a (word,frequency,timestamp) tuple.
+ * A simple application used as smoke test example to forward messages from one topic to another
+ * topic in batch mode.
  *
  * <p>Example usage: --input-topic test-input --output-topic test-output --bootstrap.servers
  * localhost:9092 --group.id myconsumer
@@ -51,19 +46,25 @@ public class KafkaExample extends KafkaExampleUtil {
         final ParameterTool parameterTool = ParameterTool.fromArgs(args);
         StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
 
-        DataStream<KafkaEvent> input =
-                env.addSource(
-                                new FlinkKafkaConsumer<>(
-                                                parameterTool.getRequired("input-topic"),
-                                                new KafkaEventSchema(),
-                                                parameterTool.getProperties())
-                                        .assignTimestampsAndWatermarks(
-                                                new CustomWatermarkExtractor()))
-                        .keyBy("word")
-                        .map(new RollingAdditionMapper());
+        DataStream<Integer> input =
+                env.fromSource(
+                        KafkaSource.<Integer>builder()
+                                .setBootstrapServers(
+                                        parameterTool
+                                                .getProperties()
+                                                .getProperty(
+                                                        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+                                .setBounded(OffsetsInitializer.latest())
+                                .setDeserializer(
+                                        KafkaRecordDeserializationSchema.valueOnly(
+                                                IntegerDeserializer.class))
+                                .setTopics(parameterTool.getRequired("input-topic"))
+                                .build(),
+                        WatermarkStrategy.noWatermarks(),
+                        "kafka-source");
 
         input.sinkTo(
-                KafkaSink.<KafkaEvent>builder()
+                KafkaSink.<Integer>builder()
                         .setBootstrapServers(
                                 parameterTool
                                         .getProperties()
@@ -71,10 +72,9 @@ public class KafkaExample extends KafkaExampleUtil {
                         .setRecordSerializer(
                                 KafkaRecordSerializationSchema.builder()
                                         .setTopic(parameterTool.getRequired("output-topic"))
-                                        .setValueSerializationSchema(new KafkaEventSchema())
+                                        .setKafkaValueSerializer(IntegerSerializer.class)
                                         .build())
                         .build());
-
-        env.execute("Modern Kafka Example");
+        env.execute("Smoke Kafka Example");
     }
 }