You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/18 12:58:46 UTC
[flink] 05/05: [FLINK-15670] Kafka Shuffle Test Case + add log4j2
file
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 37f6db04de7c0bc092fa07e821f7082549b167ef
Author: Yuan Mei <yu...@gmail.com>
AuthorDate: Mon May 18 17:06:03 2020 +0800
[FLINK-15670] Kafka Shuffle Test Case + add log4j2 file
KafkaShuffle provides a transparent Kafka source and sink pair, through which the network traffic of a shuffle step is persisted and redirected.
---
.../shuffle/KafkaShuffleExactlyOnceITCase.java | 205 +++++++++
.../kafka/shuffle/KafkaShuffleITCase.java | 476 +++++++++++++++++++++
.../kafka/shuffle/KafkaShuffleTestBase.java | 269 ++++++++++++
.../src/test/resources/log4j2-test.properties | 38 ++
4 files changed, 988 insertions(+)
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java
new file mode 100644
index 0000000..8406862
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.IngestionTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+/**
+ * Failure Recovery IT Test for KafkaShuffle.
+ */
+public class KafkaShuffleExactlyOnceITCase extends KafkaShuffleTestBase {
+
+ @Rule
+ public final Timeout timeout = Timeout.millis(600000L);
+
+ /**
+ * Failure Recovery after processing 2/3 data with time characteristic: ProcessingTime.
+ *
+ * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+ */
+ @Test
+ public void testFailureRecoveryProcessingTime() throws Exception {
+ testKafkaShuffleFailureRecovery(1000, ProcessingTime);
+ }
+
+ /**
+ * Failure Recovery after processing 2/3 data with time characteristic: IngestionTime.
+ *
+ * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+ */
+ @Test
+ public void testFailureRecoveryIngestionTime() throws Exception {
+ testKafkaShuffleFailureRecovery(1000, IngestionTime);
+ }
+
+ /**
+ * Failure Recovery after processing 2/3 data with time characteristic: EventTime.
+ *
+ * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+ */
+ @Test
+ public void testFailureRecoveryEventTime() throws Exception {
+ testKafkaShuffleFailureRecovery(1000, EventTime);
+ }
+
+ /**
+ * Failure Recovery after data is repartitioned with time characteristic: ProcessingTime.
+ *
+ * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+ */
+ @Test
+ public void testAssignedToPartitionFailureRecoveryProcessingTime() throws Exception {
+ testAssignedToPartitionFailureRecovery(500, ProcessingTime);
+ }
+
+ /**
+ * Failure Recovery after data is repartitioned with time characteristic: IngestionTime.
+ *
+ * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+ */
+ @Test
+ public void testAssignedToPartitionFailureRecoveryIngestionTime() throws Exception {
+ testAssignedToPartitionFailureRecovery(500, IngestionTime);
+ }
+
+ /**
+ * Failure Recovery after data is repartitioned with time characteristic: EventTime.
+ *
+ * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+ */
+ @Test
+ public void testAssignedToPartitionFailureRecoveryEventTime() throws Exception {
+ testAssignedToPartitionFailureRecovery(500, EventTime);
+ }
+
+ /**
+ * To test failure recovery after processing 2/3 data.
+ *
+ * <p>Schema: (key, timestamp, source instance Id).
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1
+ */
+ private void testKafkaShuffleFailureRecovery(
+ int numElementsPerProducer,
+ TimeCharacteristic timeCharacteristic) throws Exception {
+
+ String topic = topic("failure_recovery", timeCharacteristic);
+ final int numberOfPartitions = 1;
+ final int producerParallelism = 1;
+ final int failAfterElements = numElementsPerProducer * numberOfPartitions * 2 / 3;
+
+ createTestTopic(topic, numberOfPartitions, 1);
+
+ final StreamExecutionEnvironment env =
+ createEnvironment(producerParallelism, timeCharacteristic).enableCheckpointing(500);
+
+ createKafkaShuffle(
+ env, topic, numElementsPerProducer, producerParallelism, timeCharacteristic, numberOfPartitions)
+ .map(new FailingIdentityMapper<>(failAfterElements)).setParallelism(1)
+ .map(new ToInteger(producerParallelism)).setParallelism(1)
+ .addSink(new ValidatingExactlyOnceSink(numElementsPerProducer * producerParallelism)).setParallelism(1);
+
+ FailingIdentityMapper.failedBefore = false;
+
+ tryExecute(env, topic);
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * To test failure recovery with partition assignment after processing 2/3 data.
+ *
+ * <p>Schema: (key, timestamp, source instance Id).
+ * Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3
+ */
+ private void testAssignedToPartitionFailureRecovery(
+ int numElementsPerProducer,
+ TimeCharacteristic timeCharacteristic) throws Exception {
+ String topic = topic("partition_failure_recovery", timeCharacteristic);
+ final int numberOfPartitions = 3;
+ final int producerParallelism = 2;
+ final int failAfterElements = numElementsPerProducer * producerParallelism * 2 / 3;
+
+ createTestTopic(topic, numberOfPartitions, 1);
+
+ final StreamExecutionEnvironment env = createEnvironment(producerParallelism, timeCharacteristic);
+
+ KeyedStream<Tuple3<Integer, Long, Integer>, Tuple> keyedStream = createKafkaShuffle(
+ env,
+ topic,
+ numElementsPerProducer,
+ producerParallelism,
+ timeCharacteristic,
+ numberOfPartitions);
+ keyedStream
+ .process(new PartitionValidator(keyedStream.getKeySelector(), numberOfPartitions, topic))
+ .setParallelism(numberOfPartitions)
+ .map(new ToInteger(producerParallelism)).setParallelism(numberOfPartitions)
+ .map(new FailingIdentityMapper<>(failAfterElements)).setParallelism(1)
+ .addSink(new ValidatingExactlyOnceSink(numElementsPerProducer * producerParallelism)).setParallelism(1);
+
+ FailingIdentityMapper.failedBefore = false;
+
+ tryExecute(env, topic);
+
+ deleteTestTopic(topic);
+ }
+
+ private StreamExecutionEnvironment createEnvironment(
+ int producerParallelism,
+ TimeCharacteristic timeCharacteristic) {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(producerParallelism);
+ env.setStreamTimeCharacteristic(timeCharacteristic);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.setBufferTimeout(0);
+ env.enableCheckpointing(500);
+
+ return env;
+ }
+
+ private static class ToInteger implements MapFunction<Tuple3<Integer, Long, Integer>, Integer> {
+ private final int producerParallelism;
+
+ ToInteger(int producerParallelism) {
+ this.producerParallelism = producerParallelism;
+ }
+
+ @Override
+ public Integer map(Tuple3<Integer, Long, Integer> element) throws Exception {
+
+ return element.f0 * producerParallelism + element.f2;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
new file mode 100644
index 0000000..805f7ef
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleElement;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleElementDeserializer;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleRecord;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleWatermark;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.IngestionTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime;
+import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.fail;
+
+/**
+ * Simple End to End Test for Kafka.
+ */
+public class KafkaShuffleITCase extends KafkaShuffleTestBase {
+
+ @Rule
+ public final Timeout timeout = Timeout.millis(600000L);
+
+ /**
+ * To test no data is lost or duplicated end-2-end with the default time characteristic: ProcessingTime.
+ *
+ * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+ */
+ @Test
+ public void testSimpleProcessingTime() throws Exception {
+ testKafkaShuffle(200000, ProcessingTime);
+ }
+
+ /**
+ * To test no data is lost or duplicated end-2-end with time characteristic: IngestionTime.
+ *
+ * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+ */
+ @Test
+ public void testSimpleIngestionTime() throws Exception {
+ testKafkaShuffle(200000, IngestionTime);
+ }
+
+ /**
+ * To test no data is lost or duplicated end-2-end with time characteristic: EventTime.
+ *
+ * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+ */
+ @Test
+ public void testSimpleEventTime() throws Exception {
+ testKafkaShuffle(100000, EventTime);
+ }
+
+ /**
+ * To test data is partitioned to the right partition with time characteristic: ProcessingTime.
+ *
+ * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+ */
+ @Test
+ public void testAssignedToPartitionProcessingTime() throws Exception {
+ testAssignedToPartition(300000, ProcessingTime);
+ }
+
+ /**
+ * To test data is partitioned to the right partition with time characteristic: IngestionTime.
+ *
+ * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+ */
+ @Test
+ public void testAssignedToPartitionIngestionTime() throws Exception {
+ testAssignedToPartition(300000, IngestionTime);
+ }
+
+ /**
+ * To test data is partitioned to the right partition with time characteristic: EventTime.
+ *
+ * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+ */
+ @Test
+ public void testAssignedToPartitionEventTime() throws Exception {
+ testAssignedToPartition(100000, EventTime);
+ }
+
+ /**
+ * To test watermark is monotonically incremental with randomized watermark.
+ *
+ * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+ */
+ @Test
+ public void testWatermarkIncremental() throws Exception {
+ testWatermarkIncremental(100000);
+ }
+
+ /**
+ * To test value serialization and deserialization with time characteristic: ProcessingTime.
+ *
+ * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+ */
+ @Test
+ public void testSerDeProcessingTime() throws Exception {
+ testRecordSerDe(ProcessingTime);
+ }
+
+ /**
+ * To test value and watermark serialization and deserialization with time characteristic: IngestionTime.
+ *
+ * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+ */
+ @Test
+ public void testSerDeIngestionTime() throws Exception {
+ testRecordSerDe(IngestionTime);
+ }
+
+ /**
+ * To test value and watermark serialization and deserialization with time characteristic: EventTime.
+ *
+ * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+ */
+ @Test
+ public void testSerDeEventTime() throws Exception {
+ testRecordSerDe(EventTime);
+ }
+
+ /**
+ * To test value and watermark serialization and deserialization with time characteristic: EventTime.
+ *
+ * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+ */
+ @Test
+ public void testWatermarkBroadcasting() throws Exception {
+ final int numberOfPartitions = 3;
+ final int producerParallelism = 2;
+ final int numElementsPerProducer = 1000;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ Map<Integer, Collection<ConsumerRecord<byte[], byte[]>>> results = testKafkaShuffleProducer(
+ topic("test_watermark_broadcast", EventTime),
+ env,
+ numberOfPartitions,
+ producerParallelism,
+ numElementsPerProducer,
+ EventTime);
+ TypeSerializer<Tuple3<Integer, Long, Integer>> typeSerializer = createTypeSerializer(env);
+ KafkaShuffleElementDeserializer deserializer = new KafkaShuffleElementDeserializer<>(typeSerializer);
+
+ // Records in a single partition are kept in order
+ for (int p = 0; p < numberOfPartitions; p++) {
+ Collection<ConsumerRecord<byte[], byte[]>> records = results.get(p);
+ Map<Integer, List<KafkaShuffleWatermark>> watermarks = new HashMap<>();
+
+ for (ConsumerRecord<byte[], byte[]> consumerRecord : records) {
+ Assert.assertNull(consumerRecord.key());
+ KafkaShuffleElement element = deserializer.deserialize(consumerRecord);
+ if (element.isRecord()) {
+ KafkaShuffleRecord<Tuple3<Integer, Long, Integer>> record = element.asRecord();
+ Assert.assertEquals(record.getValue().f1.longValue(), INIT_TIMESTAMP + record.getValue().f0);
+ Assert.assertEquals(record.getTimestamp().longValue(), record.getValue().f1.longValue());
+ } else if (element.isWatermark()) {
+ KafkaShuffleWatermark watermark = element.asWatermark();
+ watermarks.computeIfAbsent(watermark.getSubtask(), k -> new ArrayList<>());
+ watermarks.get(watermark.getSubtask()).add(watermark);
+ } else {
+ fail("KafkaShuffleElement is either record or watermark");
+ }
+ }
+
+ // According to the setting how watermarks are generated in this ITTest,
+ // every producer task emits a watermark corresponding to each record + the end-of-event-time watermark.
+ // Hence each producer sub task generates `numElementsPerProducer + 1` watermarks.
+ // Each producer sub task broadcasts these `numElementsPerProducer + 1` watermarks to all partitions.
+ // Thus in total, each producer sub task emits `(numElementsPerProducer + 1) * numberOfPartitions` watermarks.
+ // From the consumer side, each partition receives `(numElementsPerProducer + 1) * producerParallelism` watermarks,
+ // with each producer sub task produces `numElementsPerProducer + 1` watermarks.
+ // Besides, watermarks from the same producer sub task should keep in order.
+ for (List<KafkaShuffleWatermark> subTaskWatermarks : watermarks.values()) {
+ int index = 0;
+ Assert.assertEquals(numElementsPerProducer + 1, subTaskWatermarks.size());
+ for (KafkaShuffleWatermark watermark : subTaskWatermarks) {
+ if (index == numElementsPerProducer) {
+ // the last element is the watermark that signifies end-of-event-time
+ Assert.assertEquals(watermark.getWatermark(), Watermark.MAX_WATERMARK.getTimestamp());
+ } else {
+ Assert.assertEquals(watermark.getWatermark(), INIT_TIMESTAMP + index++);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * To test no data is lost or duplicated end-2-end.
+ *
+ * <p>Schema: (key, timestamp, source instance Id).
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1
+ */
+ private void testKafkaShuffle(
+ int numElementsPerProducer,
+ TimeCharacteristic timeCharacteristic) throws Exception {
+ String topic = topic("test_simple", timeCharacteristic);
+ final int numberOfPartitions = 1;
+ final int producerParallelism = 1;
+
+ createTestTopic(topic, numberOfPartitions, 1);
+
+ final StreamExecutionEnvironment env = createEnvironment(producerParallelism, timeCharacteristic);
+ createKafkaShuffle(
+ env,
+ topic,
+ numElementsPerProducer,
+ producerParallelism,
+ timeCharacteristic,
+ numberOfPartitions)
+ .map(new ElementCountNoMoreThanValidator(numElementsPerProducer * producerParallelism)).setParallelism(1)
+ .map(new ElementCountNoLessThanValidator(numElementsPerProducer * producerParallelism)).setParallelism(1);
+
+ tryExecute(env, topic);
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * To test data is partitioned to the right partition.
+ *
+ * <p>Schema: (key, timestamp, source instance Id).
+ * Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3
+ */
+ private void testAssignedToPartition(
+ int numElementsPerProducer,
+ TimeCharacteristic timeCharacteristic) throws Exception {
+ String topic = topic("test_assigned_to_partition", timeCharacteristic);
+ final int numberOfPartitions = 3;
+ final int producerParallelism = 2;
+
+ createTestTopic(topic, numberOfPartitions, 1);
+
+ final StreamExecutionEnvironment env = createEnvironment(producerParallelism, timeCharacteristic);
+
+ KeyedStream<Tuple3<Integer, Long, Integer>, Tuple> keyedStream = createKafkaShuffle(
+ env,
+ topic,
+ numElementsPerProducer,
+ producerParallelism,
+ timeCharacteristic,
+ numberOfPartitions);
+ keyedStream
+ .process(new PartitionValidator(keyedStream.getKeySelector(), numberOfPartitions, topic))
+ .setParallelism(numberOfPartitions)
+ .map(new ElementCountNoMoreThanValidator(numElementsPerProducer * producerParallelism)).setParallelism(1)
+ .map(new ElementCountNoLessThanValidator(numElementsPerProducer * producerParallelism)).setParallelism(1);
+
+ tryExecute(env, topic);
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * To watermark from the consumer side always increase.
+ *
+ * <p>Schema: (key, timestamp, source instance Id).
+ * Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3
+ */
+ private void testWatermarkIncremental(int numElementsPerProducer) throws Exception {
+ TimeCharacteristic timeCharacteristic = EventTime;
+ String topic = topic("test_watermark_incremental", timeCharacteristic);
+ final int numberOfPartitions = 3;
+ final int producerParallelism = 2;
+
+ createTestTopic(topic, numberOfPartitions, 1);
+
+ final StreamExecutionEnvironment env = createEnvironment(producerParallelism, timeCharacteristic);
+
+ KeyedStream<Tuple3<Integer, Long, Integer>, Tuple> keyedStream = createKafkaShuffle(
+ env,
+ topic,
+ numElementsPerProducer,
+ producerParallelism,
+ timeCharacteristic,
+ numberOfPartitions,
+ true);
+ keyedStream
+ .process(new WatermarkValidator())
+ .setParallelism(numberOfPartitions)
+ .map(new ElementCountNoMoreThanValidator(numElementsPerProducer * producerParallelism)).setParallelism(1)
+ .map(new ElementCountNoLessThanValidator(numElementsPerProducer * producerParallelism)).setParallelism(1);
+
+ tryExecute(env, topic);
+
+ deleteTestTopic(topic);
+ }
+
+ private void testRecordSerDe(TimeCharacteristic timeCharacteristic) throws Exception {
+ final int numElementsPerProducer = 2000;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // Records in a single partition are kept in order
+ Collection<ConsumerRecord<byte[], byte[]>> records = Iterables.getOnlyElement(
+ testKafkaShuffleProducer(
+ topic("test_serde", timeCharacteristic), env, 1, 1, numElementsPerProducer, timeCharacteristic).values());
+
+ switch (timeCharacteristic) {
+ case ProcessingTime:
+ // NonTimestampContext, no watermark
+ Assert.assertEquals(records.size(), numElementsPerProducer);
+ break;
+ case IngestionTime:
+ // IngestionTime uses AutomaticWatermarkContext and it emits a watermark after every `watermarkInterval`
+ // with default interval 200, hence difficult to control the number of watermarks
+ break;
+ case EventTime:
+ // ManualWatermarkContext
+ // `numElementsPerProducer` records, `numElementsPerProducer` watermarks, and one end-of-event-time watermark
+ Assert.assertEquals(records.size(), numElementsPerProducer * 2 + 1);
+ break;
+ default:
+ fail("unknown TimeCharacteristic type");
+ }
+
+ TypeSerializer<Tuple3<Integer, Long, Integer>> typeSerializer = createTypeSerializer(env);
+
+ KafkaShuffleElementDeserializer deserializer = new KafkaShuffleElementDeserializer<>(typeSerializer);
+
+ int recordIndex = 0;
+ int watermarkIndex = 0;
+ for (ConsumerRecord<byte[], byte[]> consumerRecord : records) {
+ Assert.assertNull(consumerRecord.key());
+ KafkaShuffleElement element = deserializer.deserialize(consumerRecord);
+ if (element.isRecord()) {
+ KafkaShuffleRecord<Tuple3<Integer, Long, Integer>> record = element.asRecord();
+ switch (timeCharacteristic) {
+ case ProcessingTime:
+ Assert.assertNull(record.getTimestamp());
+ break;
+ case IngestionTime:
+ Assert.assertNotNull(record.getTimestamp());
+ break;
+ case EventTime:
+ Assert.assertEquals(record.getTimestamp().longValue(), record.getValue().f1.longValue());
+ break;
+ default:
+ fail("unknown TimeCharacteristic type");
+ }
+ Assert.assertEquals(record.getValue().f0.intValue(), recordIndex);
+ Assert.assertEquals(record.getValue().f1.longValue(), INIT_TIMESTAMP + recordIndex);
+ Assert.assertEquals(record.getValue().f2.intValue(), 0);
+ recordIndex++;
+ } else if (element.isWatermark()) {
+ switch (timeCharacteristic) {
+ case ProcessingTime:
+ fail("Watermarks should not be generated in the case of ProcessingTime");
+ break;
+ case IngestionTime:
+ break;
+ case EventTime:
+ KafkaShuffleWatermark watermark = element.asWatermark();
+ Assert.assertEquals(watermark.getSubtask(), 0);
+ if (watermarkIndex == recordIndex) {
+ // the last element is the watermark that signifies end-of-event-time
+ Assert.assertEquals(watermark.getWatermark(), Watermark.MAX_WATERMARK.getTimestamp());
+ } else {
+ Assert.assertEquals(watermark.getWatermark(), INIT_TIMESTAMP + watermarkIndex);
+ }
+ break;
+ default:
+ fail("unknown TimeCharacteristic type");
+ }
+ watermarkIndex++;
+ } else {
+ fail("KafkaShuffleElement is either record or watermark");
+ }
+ }
+ }
+
+ private Map<Integer, Collection<ConsumerRecord<byte[], byte[]>>> testKafkaShuffleProducer(
+ String topic,
+ StreamExecutionEnvironment env,
+ int numberOfPartitions,
+ int producerParallelism,
+ int numElementsPerProducer,
+ TimeCharacteristic timeCharacteristic) throws Exception {
+ createTestTopic(topic, numberOfPartitions, 1);
+
+ env.setParallelism(producerParallelism);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStreamTimeCharacteristic(timeCharacteristic);
+
+ DataStream<Tuple3<Integer, Long, Integer>> source =
+ env.addSource(new KafkaSourceFunction(numElementsPerProducer, false)).setParallelism(producerParallelism);
+ DataStream<Tuple3<Integer, Long, Integer>> input = (timeCharacteristic == EventTime) ?
+ source.assignTimestampsAndWatermarks(new PunctuatedExtractor()).setParallelism(producerParallelism) : source;
+
+ Properties properties = kafkaServer.getStandardProperties();
+ Properties kafkaProperties = PropertiesUtil.flatten(properties);
+
+ kafkaProperties.setProperty(PRODUCER_PARALLELISM, String.valueOf(producerParallelism));
+ kafkaProperties.setProperty(PARTITION_NUMBER, String.valueOf(numberOfPartitions));
+ kafkaProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ kafkaProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ FlinkKafkaShuffle.writeKeyBy(input, topic, kafkaProperties, 0);
+
+ env.execute("Write to " + topic);
+ ImmutableMap.Builder<Integer, Collection<ConsumerRecord<byte[], byte[]>>> results = ImmutableMap.builder();
+
+ for (int p = 0; p < numberOfPartitions; p++) {
+ results.put(p, kafkaServer.getAllRecordsFromTopic(kafkaProperties, topic, p, 5000));
+ }
+
+ deleteTestTopic(topic);
+
+ return results.build();
+ }
+
+ private StreamExecutionEnvironment createEnvironment(
+ int producerParallelism,
+ TimeCharacteristic timeCharacteristic) {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(producerParallelism);
+ env.setStreamTimeCharacteristic(timeCharacteristic);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ return env;
+ }
+
+ private TypeSerializer<Tuple3<Integer, Long, Integer>> createTypeSerializer(StreamExecutionEnvironment env) {
+ return new TupleTypeInfo<Tuple3<Integer, Long, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO)
+ .createSerializer(env.getConfig());
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleTestBase.java
new file mode 100644
index 0000000..a42b151
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleTestBase.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+
+import org.junit.BeforeClass;
+
+import java.util.Random;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+
+/**
+ * Base Test Class for KafkaShuffle.
+ */
+public class KafkaShuffleTestBase extends KafkaConsumerTestBase {
+ static final long INIT_TIMESTAMP = System.currentTimeMillis();
+
+ @BeforeClass
+ public static void prepare() throws Exception {
+ KafkaProducerTestBase.prepare();
+ ((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
+ }
+
+ static class KafkaSourceFunction extends RichParallelSourceFunction<Tuple3<Integer, Long, Integer>> {
+ private volatile boolean running = true;
+ private final int numElementsPerProducer;
+ private final boolean unBounded;
+
+ KafkaSourceFunction(int numElementsPerProducer) {
+ this.numElementsPerProducer = numElementsPerProducer;
+ this.unBounded = true;
+ }
+
+ KafkaSourceFunction(int numElementsPerProducer, boolean unBounded) {
+ this.numElementsPerProducer = numElementsPerProducer;
+ this.unBounded = unBounded;
+ }
+
+ @Override
+ public void run(SourceContext<Tuple3<Integer, Long, Integer>> ctx) throws Exception{
+ long timestamp = INIT_TIMESTAMP;
+ int sourceInstanceId = getRuntimeContext().getIndexOfThisSubtask();
+ for (int i = 0; i < numElementsPerProducer && running; i++) {
+ ctx.collect(new Tuple3<>(i, timestamp++, sourceInstanceId));
+ }
+
+ while (running && unBounded) {
+ Thread.sleep(100);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
+ static KeyedStream<Tuple3<Integer, Long, Integer>, Tuple> createKafkaShuffle(
+ StreamExecutionEnvironment env,
+ String topic,
+ int numElementsPerProducer,
+ int producerParallelism,
+ TimeCharacteristic timeCharacteristic,
+ int numberOfPartitions) {
+ return createKafkaShuffle(
+ env,
+ topic,
+ numElementsPerProducer,
+ producerParallelism,
+ timeCharacteristic,
+ numberOfPartitions,
+ false);
+ }
+
+ static KeyedStream<Tuple3<Integer, Long, Integer>, Tuple> createKafkaShuffle(
+ StreamExecutionEnvironment env,
+ String topic,
+ int numElementsPerProducer,
+ int producerParallelism,
+ TimeCharacteristic timeCharacteristic,
+ int numberOfPartitions,
+ boolean randomness) {
+ DataStream<Tuple3<Integer, Long, Integer>> source =
+ env.addSource(new KafkaSourceFunction(numElementsPerProducer)).setParallelism(producerParallelism);
+ DataStream<Tuple3<Integer, Long, Integer>> input = (timeCharacteristic == EventTime) ?
+ source.assignTimestampsAndWatermarks(new PunctuatedExtractor(randomness)).setParallelism(producerParallelism) : source;
+
+ return FlinkKafkaShuffle.persistentKeyBy(
+ input,
+ topic,
+ producerParallelism,
+ numberOfPartitions,
+ kafkaServer.getStandardProperties(),
+ 0);
+ }
+
+ static class PunctuatedExtractor implements AssignerWithPunctuatedWatermarks<Tuple3<Integer, Long, Integer>> {
+ private static final long serialVersionUID = 1L;
+ boolean randomness;
+ Random rnd = new Random(123);
+
+ PunctuatedExtractor() {
+ randomness = false;
+ }
+
+ PunctuatedExtractor(boolean randomness) {
+ this.randomness = randomness;
+ }
+
+ @Override
+ public long extractTimestamp(Tuple3<Integer, Long, Integer> element, long previousTimestamp) {
+ return element.f1;
+ }
+
+ @Override
+ public Watermark checkAndGetNextWatermark(Tuple3<Integer, Long, Integer> lastElement, long extractedTimestamp) {
+ long randomValue = randomness ? rnd.nextInt(10) : 0;
+ return new Watermark(extractedTimestamp + randomValue);
+ }
+ }
+
+ static class PartitionValidator
+ extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>> {
+ private final KeySelector<Tuple3<Integer, Long, Integer>, Tuple> keySelector;
+ private final int numberOfPartitions;
+ private final String topic;
+
+ private int previousPartition;
+
+ PartitionValidator(
+ KeySelector<Tuple3<Integer, Long, Integer>, Tuple> keySelector,
+ int numberOfPartitions,
+ String topic) {
+ this.keySelector = keySelector;
+ this.numberOfPartitions = numberOfPartitions;
+ this.topic = topic;
+ this.previousPartition = -1;
+ }
+
+ @Override
+ public void processElement(
+ Tuple3<Integer, Long, Integer> in,
+ Context ctx,
+ Collector<Tuple3<Integer, Long, Integer>> out) throws Exception {
+ int expectedPartition = KeyGroupRangeAssignment
+ .assignKeyToParallelOperator(keySelector.getKey(in), numberOfPartitions, numberOfPartitions);
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ KafkaTopicPartition partition = new KafkaTopicPartition(topic, expectedPartition);
+
+ // This is how Kafka assign partition to subTask;
+ boolean rightAssignment =
+ KafkaTopicPartitionAssigner.assign(partition, numberOfPartitions) == indexOfThisSubtask;
+ boolean samePartition = (previousPartition == expectedPartition) || (previousPartition == -1);
+ previousPartition = expectedPartition;
+
+ if (!(rightAssignment && samePartition)) {
+ throw new Exception("Error: Kafka partition assignment error ");
+ }
+ out.collect(in);
+ }
+ }
+
+ static class WatermarkValidator
+ extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>> {
+ private long previousWatermark = Long.MIN_VALUE; // initial watermark get from timeService
+
+ @Override
+ public void processElement(
+ Tuple3<Integer, Long, Integer> in,
+ Context ctx,
+ Collector<Tuple3<Integer, Long, Integer>> out) throws Exception {
+
+ long watermark = ctx.timerService().currentWatermark();
+
+ // Notice that the timerService might not be updated if no new watermark has been emitted, hence equivalent
+ // watermark is allowed, strictly incremental check is done when fetching watermark from KafkaShuffleFetcher.
+ if (watermark < previousWatermark) {
+ throw new Exception(
+ "Error: watermark should always increase. current watermark : previous watermark ["
+ + watermark + " : " + previousWatermark + "]");
+ }
+ previousWatermark = watermark;
+
+ out.collect(in);
+ }
+ }
+
+ static class ElementCountNoLessThanValidator
+ implements MapFunction<Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>> {
+ private final int totalCount;
+ private int counter = 0;
+
+ ElementCountNoLessThanValidator(int totalCount) {
+ this.totalCount = totalCount;
+ }
+
+ @Override
+ public Tuple3<Integer, Long, Integer> map(Tuple3<Integer, Long, Integer> element) throws Exception {
+ counter++;
+
+ if (counter == totalCount) {
+ throw new SuccessException();
+ }
+
+ return element;
+ }
+ }
+
+ static class ElementCountNoMoreThanValidator
+ implements MapFunction<Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>> {
+ private final int totalCount;
+ private int counter = 0;
+
+ ElementCountNoMoreThanValidator(int totalCount) {
+ this.totalCount = totalCount;
+ }
+
+ @Override
+ public Tuple3<Integer, Long, Integer> map(Tuple3<Integer, Long, Integer> element) throws Exception {
+ counter++;
+
+ if (counter > totalCount) {
+ throw new Exception("Error: number of elements more than expected");
+ }
+
+ return element;
+ }
+ }
+
+ String topic(String prefix, TimeCharacteristic timeCharacteristic) {
+ return prefix + "_" + timeCharacteristic;
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-kafka/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..863665c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/resources/log4j2-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
+logger.kafka.name = kafka
+logger.kafka.level = OFF
+logger.kafka2.name = state.change
+logger.kafka2.level = OFF
+
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = OFF
+logger.I0Itec.name = org.I0Itec
+logger.I0Itec.level = OFF