You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/18 12:58:46 UTC

[flink] 05/05: [FLINK-15670] Kafka Shuffle Test Case + add log4j2 file

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37f6db04de7c0bc092fa07e821f7082549b167ef
Author: Yuan Mei <yu...@gmail.com>
AuthorDate: Mon May 18 17:06:03 2020 +0800

    [FLINK-15670] Kafka Shuffle Test Case + add log4j2 file
    
    KafkaShuffle provides a transparent Kafka source and sink pair, through which the network traffic of a shuffle step is persisted and redirected.
---
 .../shuffle/KafkaShuffleExactlyOnceITCase.java     | 205 +++++++++
 .../kafka/shuffle/KafkaShuffleITCase.java          | 476 +++++++++++++++++++++
 .../kafka/shuffle/KafkaShuffleTestBase.java        | 269 ++++++++++++
 .../src/test/resources/log4j2-test.properties      |  38 ++
 4 files changed, 988 insertions(+)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java
new file mode 100644
index 0000000..8406862
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.IngestionTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+/**
+ * Failure Recovery IT Test for KafkaShuffle.
+ */
+public class KafkaShuffleExactlyOnceITCase extends KafkaShuffleTestBase {
+
+	@Rule
+	public final Timeout timeout = Timeout.millis(600000L);
+
+	/**
+	 * Failure Recovery after processing 2/3 data with time characteristic: ProcessingTime.
+	 *
+	 * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+	 */
+	@Test
+	public void testFailureRecoveryProcessingTime() throws Exception {
+		testKafkaShuffleFailureRecovery(1000, ProcessingTime);
+	}
+
+	/**
+	 * Failure Recovery after processing 2/3 data with time characteristic: IngestionTime.
+	 *
+	 * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+	 */
+	@Test
+	public void testFailureRecoveryIngestionTime() throws Exception {
+		testKafkaShuffleFailureRecovery(1000, IngestionTime);
+	}
+
+	/**
+	 * Failure Recovery after processing 2/3 data with time characteristic: EventTime.
+	 *
+	 * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+	 */
+	@Test
+	public void testFailureRecoveryEventTime() throws Exception {
+		testKafkaShuffleFailureRecovery(1000, EventTime);
+	}
+
+	/**
+	 * Failure Recovery after data is repartitioned with time characteristic: ProcessingTime.
+	 *
+	 * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+	 */
+	@Test
+	public void testAssignedToPartitionFailureRecoveryProcessingTime() throws Exception {
+		testAssignedToPartitionFailureRecovery(500, ProcessingTime);
+	}
+
+	/**
+	 * Failure Recovery after data is repartitioned with time characteristic: IngestionTime.
+	 *
+	 * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+	 */
+	@Test
+	public void testAssignedToPartitionFailureRecoveryIngestionTime() throws Exception {
+		testAssignedToPartitionFailureRecovery(500, IngestionTime);
+	}
+
+	/**
+	 * Failure Recovery after data is repartitioned with time characteristic: EventTime.
+	 *
+	 * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+	 */
+	@Test
+	public void testAssignedToPartitionFailureRecoveryEventTime() throws Exception {
+		testAssignedToPartitionFailureRecovery(500, EventTime);
+	}
+
+	/**
+	 * To test failure recovery after processing 2/3 data.
+	 *
+	 * <p>Schema: (key, timestamp, source instance Id).
+	 * Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1
+	 */
+	private void testKafkaShuffleFailureRecovery(
+			int numElementsPerProducer,
+			TimeCharacteristic timeCharacteristic) throws Exception {
+
+		String topic = topic("failure_recovery", timeCharacteristic);
+		final int numberOfPartitions = 1;
+		final int producerParallelism = 1;
+		final int failAfterElements = numElementsPerProducer * numberOfPartitions * 2 / 3;
+
+		createTestTopic(topic, numberOfPartitions, 1);
+
+		final StreamExecutionEnvironment env =
+			createEnvironment(producerParallelism, timeCharacteristic).enableCheckpointing(500);
+
+		createKafkaShuffle(
+			env, topic, numElementsPerProducer, producerParallelism, timeCharacteristic, numberOfPartitions)
+			.map(new FailingIdentityMapper<>(failAfterElements)).setParallelism(1)
+			.map(new ToInteger(producerParallelism)).setParallelism(1)
+			.addSink(new ValidatingExactlyOnceSink(numElementsPerProducer * producerParallelism)).setParallelism(1);
+
+		FailingIdentityMapper.failedBefore = false;
+
+		tryExecute(env, topic);
+
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * To test failure recovery with partition assignment after processing 2/3 data.
+	 *
+	 * <p>Schema: (key, timestamp, source instance Id).
+	 * Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3
+	 */
+	private void testAssignedToPartitionFailureRecovery(
+			int numElementsPerProducer,
+			TimeCharacteristic timeCharacteristic) throws Exception {
+		String topic = topic("partition_failure_recovery", timeCharacteristic);
+		final int numberOfPartitions = 3;
+		final int producerParallelism = 2;
+		final int failAfterElements = numElementsPerProducer * producerParallelism * 2 / 3;
+
+		createTestTopic(topic, numberOfPartitions, 1);
+
+		final StreamExecutionEnvironment env = createEnvironment(producerParallelism, timeCharacteristic);
+
+		KeyedStream<Tuple3<Integer, Long, Integer>, Tuple> keyedStream = createKafkaShuffle(
+			env,
+			topic,
+			numElementsPerProducer,
+			producerParallelism,
+			timeCharacteristic,
+			numberOfPartitions);
+		keyedStream
+			.process(new PartitionValidator(keyedStream.getKeySelector(), numberOfPartitions, topic))
+			.setParallelism(numberOfPartitions)
+			.map(new ToInteger(producerParallelism)).setParallelism(numberOfPartitions)
+			.map(new FailingIdentityMapper<>(failAfterElements)).setParallelism(1)
+			.addSink(new ValidatingExactlyOnceSink(numElementsPerProducer * producerParallelism)).setParallelism(1);
+
+		FailingIdentityMapper.failedBefore = false;
+
+		tryExecute(env, topic);
+
+		deleteTestTopic(topic);
+	}
+
+	private StreamExecutionEnvironment createEnvironment(
+			int producerParallelism,
+			TimeCharacteristic timeCharacteristic) {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(producerParallelism);
+		env.setStreamTimeCharacteristic(timeCharacteristic);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+		env.setBufferTimeout(0);
+		env.enableCheckpointing(500);
+
+		return env;
+	}
+
+	private static class ToInteger implements MapFunction<Tuple3<Integer, Long, Integer>, Integer> {
+		private final int producerParallelism;
+
+		ToInteger(int producerParallelism) {
+			this.producerParallelism = producerParallelism;
+		}
+
+		@Override
+		public Integer map(Tuple3<Integer, Long, Integer> element) throws Exception {
+
+			return element.f0 * producerParallelism + element.f2;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
new file mode 100644
index 0000000..805f7ef
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleElement;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleElementDeserializer;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleRecord;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleWatermark;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.IngestionTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime;
+import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.fail;
+
+/**
+ * Simple End to End Test for Kafka.
+ */
+public class KafkaShuffleITCase extends KafkaShuffleTestBase {
+
+	@Rule
+	public final Timeout timeout = Timeout.millis(600000L);
+
+	/**
+	 * To test no data is lost or duplicated end-2-end with the default time characteristic: ProcessingTime.
+	 *
+	 * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+	 */
+	@Test
+	public void testSimpleProcessingTime() throws Exception {
+		testKafkaShuffle(200000, ProcessingTime);
+	}
+
+	/**
+	 * To test no data is lost or duplicated end-2-end with time characteristic: IngestionTime.
+	 *
+	 * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+	 */
+	@Test
+	public void testSimpleIngestionTime() throws Exception {
+		testKafkaShuffle(200000, IngestionTime);
+	}
+
+	/**
+	 * To test no data is lost or duplicated end-2-end with time characteristic: EventTime.
+	 *
+	 * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+	 */
+	@Test
+	public void testSimpleEventTime() throws Exception {
+		testKafkaShuffle(100000, EventTime);
+	}
+
+	/**
+	 * To test data is partitioned to the right partition with time characteristic: ProcessingTime.
+	 *
+	 * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+	 */
+	@Test
+	public void testAssignedToPartitionProcessingTime() throws Exception {
+		testAssignedToPartition(300000, ProcessingTime);
+	}
+
+	/**
+	 * To test data is partitioned to the right partition with time characteristic: IngestionTime.
+	 *
+	 * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+	 */
+	@Test
+	public void testAssignedToPartitionIngestionTime() throws Exception {
+		testAssignedToPartition(300000, IngestionTime);
+	}
+
+	/**
+	 * To test data is partitioned to the right partition with time characteristic: EventTime.
+	 *
+	 * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+	 */
+	@Test
+	public void testAssignedToPartitionEventTime() throws Exception {
+		testAssignedToPartition(100000, EventTime);
+	}
+
+	/**
+	 * To test watermark is monotonically incremental with randomized watermark.
+	 *
+	 * <p>Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3.
+	 */
+	@Test
+	public void testWatermarkIncremental() throws Exception {
+		testWatermarkIncremental(100000);
+	}
+
+	/**
+	 * To test value serialization and deserialization with time characteristic: ProcessingTime.
+	 *
+	 * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+	 */
+	@Test
+	public void testSerDeProcessingTime() throws Exception {
+		testRecordSerDe(ProcessingTime);
+	}
+
+	/**
+	 * To test value and watermark serialization and deserialization with time characteristic: IngestionTime.
+	 *
+	 * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+	 */
+	@Test
+	public void testSerDeIngestionTime() throws Exception {
+		testRecordSerDe(IngestionTime);
+	}
+
+	/**
+	 * To test value and watermark serialization and deserialization with time characteristic: EventTime.
+	 *
+	 * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+	 */
+	@Test
+	public void testSerDeEventTime() throws Exception {
+		testRecordSerDe(EventTime);
+	}
+
+	/**
+	 * To test value and watermark serialization and deserialization with time characteristic: EventTime.
+	 *
+	 * <p>Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1.
+	 */
+	@Test
+	public void testWatermarkBroadcasting() throws Exception {
+		final int numberOfPartitions = 3;
+		final int producerParallelism = 2;
+		final int numElementsPerProducer = 1000;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		Map<Integer, Collection<ConsumerRecord<byte[], byte[]>>> results = testKafkaShuffleProducer(
+			topic("test_watermark_broadcast", EventTime),
+			env,
+			numberOfPartitions,
+			producerParallelism,
+			numElementsPerProducer,
+			EventTime);
+		TypeSerializer<Tuple3<Integer, Long, Integer>> typeSerializer = createTypeSerializer(env);
+		KafkaShuffleElementDeserializer deserializer = new KafkaShuffleElementDeserializer<>(typeSerializer);
+
+		// Records in a single partition are kept in order
+		for (int p = 0; p < numberOfPartitions; p++) {
+			Collection<ConsumerRecord<byte[], byte[]>> records = results.get(p);
+			Map<Integer, List<KafkaShuffleWatermark>> watermarks = new HashMap<>();
+
+			for (ConsumerRecord<byte[], byte[]> consumerRecord : records) {
+				Assert.assertNull(consumerRecord.key());
+				KafkaShuffleElement element = deserializer.deserialize(consumerRecord);
+				if (element.isRecord()) {
+					KafkaShuffleRecord<Tuple3<Integer, Long, Integer>> record = element.asRecord();
+					Assert.assertEquals(record.getValue().f1.longValue(), INIT_TIMESTAMP + record.getValue().f0);
+					Assert.assertEquals(record.getTimestamp().longValue(), record.getValue().f1.longValue());
+				} else if (element.isWatermark()) {
+					KafkaShuffleWatermark watermark = element.asWatermark();
+					watermarks.computeIfAbsent(watermark.getSubtask(), k -> new ArrayList<>());
+					watermarks.get(watermark.getSubtask()).add(watermark);
+				} else {
+					fail("KafkaShuffleElement is either record or watermark");
+				}
+			}
+
+			// According to the setting how watermarks are generated in this ITTest,
+			// every producer task emits a watermark corresponding to each record + the end-of-event-time watermark.
+			// Hence each producer sub task generates `numElementsPerProducer + 1` watermarks.
+			// Each producer sub task broadcasts these `numElementsPerProducer + 1` watermarks to all partitions.
+			// Thus in total, each producer sub task emits `(numElementsPerProducer + 1) * numberOfPartitions` watermarks.
+			// From the consumer side, each partition receives `(numElementsPerProducer + 1) * producerParallelism` watermarks,
+			// with each producer sub task produces `numElementsPerProducer + 1` watermarks.
+			// Besides, watermarks from the same producer sub task should keep in order.
+			for (List<KafkaShuffleWatermark> subTaskWatermarks : watermarks.values()) {
+				int index = 0;
+				Assert.assertEquals(numElementsPerProducer + 1, subTaskWatermarks.size());
+				for (KafkaShuffleWatermark watermark : subTaskWatermarks) {
+					if (index == numElementsPerProducer) {
+						// the last element is the watermark that signifies end-of-event-time
+						Assert.assertEquals(watermark.getWatermark(), Watermark.MAX_WATERMARK.getTimestamp());
+					} else {
+						Assert.assertEquals(watermark.getWatermark(), INIT_TIMESTAMP + index++);
+					}
+				}
+			}
+		}
+	}
+
+	/**
+	 * To test no data is lost or duplicated end-2-end.
+	 *
+	 * <p>Schema: (key, timestamp, source instance Id).
+	 * Producer Parallelism = 1; Kafka Partition # = 1; Consumer Parallelism = 1
+	 */
+	private void testKafkaShuffle(
+			int numElementsPerProducer,
+			TimeCharacteristic timeCharacteristic) throws Exception {
+		String topic = topic("test_simple", timeCharacteristic);
+		final int numberOfPartitions = 1;
+		final int producerParallelism = 1;
+
+		createTestTopic(topic, numberOfPartitions, 1);
+
+		final StreamExecutionEnvironment env = createEnvironment(producerParallelism, timeCharacteristic);
+		createKafkaShuffle(
+				env,
+				topic,
+				numElementsPerProducer,
+				producerParallelism,
+				timeCharacteristic,
+				numberOfPartitions)
+			.map(new ElementCountNoMoreThanValidator(numElementsPerProducer * producerParallelism)).setParallelism(1)
+			.map(new ElementCountNoLessThanValidator(numElementsPerProducer * producerParallelism)).setParallelism(1);
+
+		tryExecute(env, topic);
+
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * To test data is partitioned to the right partition.
+	 *
+	 * <p>Schema: (key, timestamp, source instance Id).
+	 * Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3
+	 */
+	private void testAssignedToPartition(
+			int numElementsPerProducer,
+			TimeCharacteristic timeCharacteristic) throws Exception {
+		String topic = topic("test_assigned_to_partition", timeCharacteristic);
+		final int numberOfPartitions = 3;
+		final int producerParallelism = 2;
+
+		createTestTopic(topic, numberOfPartitions, 1);
+
+		final StreamExecutionEnvironment env = createEnvironment(producerParallelism, timeCharacteristic);
+
+		KeyedStream<Tuple3<Integer, Long, Integer>, Tuple> keyedStream = createKafkaShuffle(
+			env,
+			topic,
+			numElementsPerProducer,
+			producerParallelism,
+			timeCharacteristic,
+			numberOfPartitions);
+		keyedStream
+			.process(new PartitionValidator(keyedStream.getKeySelector(), numberOfPartitions, topic))
+			.setParallelism(numberOfPartitions)
+			.map(new ElementCountNoMoreThanValidator(numElementsPerProducer * producerParallelism)).setParallelism(1)
+			.map(new ElementCountNoLessThanValidator(numElementsPerProducer * producerParallelism)).setParallelism(1);
+
+		tryExecute(env, topic);
+
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * To watermark from the consumer side always increase.
+	 *
+	 * <p>Schema: (key, timestamp, source instance Id).
+	 * Producer Parallelism = 2; Kafka Partition # = 3; Consumer Parallelism = 3
+	 */
+	private void testWatermarkIncremental(int numElementsPerProducer) throws Exception {
+		TimeCharacteristic timeCharacteristic = EventTime;
+		String topic = topic("test_watermark_incremental", timeCharacteristic);
+		final int numberOfPartitions = 3;
+		final int producerParallelism = 2;
+
+		createTestTopic(topic, numberOfPartitions, 1);
+
+		final StreamExecutionEnvironment env = createEnvironment(producerParallelism, timeCharacteristic);
+
+		KeyedStream<Tuple3<Integer, Long, Integer>, Tuple> keyedStream = createKafkaShuffle(
+			env,
+			topic,
+			numElementsPerProducer,
+			producerParallelism,
+			timeCharacteristic,
+			numberOfPartitions,
+			true);
+		keyedStream
+			.process(new WatermarkValidator())
+			.setParallelism(numberOfPartitions)
+			.map(new ElementCountNoMoreThanValidator(numElementsPerProducer * producerParallelism)).setParallelism(1)
+			.map(new ElementCountNoLessThanValidator(numElementsPerProducer * producerParallelism)).setParallelism(1);
+
+		tryExecute(env, topic);
+
+		deleteTestTopic(topic);
+	}
+
+	private void testRecordSerDe(TimeCharacteristic timeCharacteristic) throws Exception {
+		final int numElementsPerProducer = 2000;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// Records in a single partition are kept in order
+		Collection<ConsumerRecord<byte[], byte[]>> records = Iterables.getOnlyElement(
+			testKafkaShuffleProducer(
+				topic("test_serde", timeCharacteristic), env, 1, 1, numElementsPerProducer, timeCharacteristic).values());
+
+		switch (timeCharacteristic) {
+			case ProcessingTime:
+				// NonTimestampContext, no watermark
+				Assert.assertEquals(records.size(), numElementsPerProducer);
+				break;
+			case IngestionTime:
+				// IngestionTime uses AutomaticWatermarkContext and it emits a watermark after every `watermarkInterval`
+				// with default interval 200, hence difficult to control the number of watermarks
+				break;
+			case EventTime:
+				// ManualWatermarkContext
+				// `numElementsPerProducer` records, `numElementsPerProducer` watermarks, and one end-of-event-time watermark
+				Assert.assertEquals(records.size(), numElementsPerProducer * 2 + 1);
+				break;
+			default:
+				fail("unknown TimeCharacteristic type");
+		}
+
+		TypeSerializer<Tuple3<Integer, Long, Integer>> typeSerializer = createTypeSerializer(env);
+
+		KafkaShuffleElementDeserializer deserializer = new KafkaShuffleElementDeserializer<>(typeSerializer);
+
+		int recordIndex = 0;
+		int watermarkIndex = 0;
+		for (ConsumerRecord<byte[], byte[]> consumerRecord : records) {
+			Assert.assertNull(consumerRecord.key());
+			KafkaShuffleElement element = deserializer.deserialize(consumerRecord);
+			if (element.isRecord()) {
+				KafkaShuffleRecord<Tuple3<Integer, Long, Integer>> record = element.asRecord();
+				switch (timeCharacteristic) {
+					case ProcessingTime:
+						Assert.assertNull(record.getTimestamp());
+						break;
+					case IngestionTime:
+						Assert.assertNotNull(record.getTimestamp());
+						break;
+					case EventTime:
+						Assert.assertEquals(record.getTimestamp().longValue(), record.getValue().f1.longValue());
+						break;
+					default:
+						fail("unknown TimeCharacteristic type");
+				}
+				Assert.assertEquals(record.getValue().f0.intValue(), recordIndex);
+				Assert.assertEquals(record.getValue().f1.longValue(), INIT_TIMESTAMP + recordIndex);
+				Assert.assertEquals(record.getValue().f2.intValue(), 0);
+				recordIndex++;
+			} else if (element.isWatermark()) {
+				switch (timeCharacteristic) {
+					case ProcessingTime:
+						fail("Watermarks should not be generated in the case of ProcessingTime");
+						break;
+					case IngestionTime:
+						break;
+					case EventTime:
+						KafkaShuffleWatermark watermark = element.asWatermark();
+						Assert.assertEquals(watermark.getSubtask(), 0);
+						if (watermarkIndex == recordIndex) {
+							// the last element is the watermark that signifies end-of-event-time
+							Assert.assertEquals(watermark.getWatermark(), Watermark.MAX_WATERMARK.getTimestamp());
+						} else {
+							Assert.assertEquals(watermark.getWatermark(), INIT_TIMESTAMP + watermarkIndex);
+						}
+						break;
+					default:
+						fail("unknown TimeCharacteristic type");
+				}
+				watermarkIndex++;
+			} else {
+				fail("KafkaShuffleElement is either record or watermark");
+			}
+		}
+	}
+
+	private Map<Integer, Collection<ConsumerRecord<byte[], byte[]>>> testKafkaShuffleProducer(
+			String topic,
+			StreamExecutionEnvironment env,
+			int numberOfPartitions,
+			int producerParallelism,
+			int numElementsPerProducer,
+			TimeCharacteristic timeCharacteristic) throws Exception {
+		createTestTopic(topic, numberOfPartitions, 1);
+
+		env.setParallelism(producerParallelism);
+		env.setRestartStrategy(RestartStrategies.noRestart());
+		env.setStreamTimeCharacteristic(timeCharacteristic);
+
+		DataStream<Tuple3<Integer, Long, Integer>> source =
+			env.addSource(new KafkaSourceFunction(numElementsPerProducer, false)).setParallelism(producerParallelism);
+		DataStream<Tuple3<Integer, Long, Integer>> input = (timeCharacteristic == EventTime) ?
+			source.assignTimestampsAndWatermarks(new PunctuatedExtractor()).setParallelism(producerParallelism) : source;
+
+		Properties properties = kafkaServer.getStandardProperties();
+		Properties kafkaProperties = PropertiesUtil.flatten(properties);
+
+		kafkaProperties.setProperty(PRODUCER_PARALLELISM, String.valueOf(producerParallelism));
+		kafkaProperties.setProperty(PARTITION_NUMBER, String.valueOf(numberOfPartitions));
+		kafkaProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+		kafkaProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+		FlinkKafkaShuffle.writeKeyBy(input, topic, kafkaProperties, 0);
+
+		env.execute("Write to " + topic);
+		ImmutableMap.Builder<Integer, Collection<ConsumerRecord<byte[], byte[]>>> results = ImmutableMap.builder();
+
+		for (int p = 0; p < numberOfPartitions; p++) {
+			results.put(p, kafkaServer.getAllRecordsFromTopic(kafkaProperties, topic, p, 5000));
+		}
+
+		deleteTestTopic(topic);
+
+		return results.build();
+	}
+
+	private StreamExecutionEnvironment createEnvironment(
+			int producerParallelism,
+			TimeCharacteristic timeCharacteristic) {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(producerParallelism);
+		env.setStreamTimeCharacteristic(timeCharacteristic);
+		env.setRestartStrategy(RestartStrategies.noRestart());
+
+		return env;
+	}
+
+	private TypeSerializer<Tuple3<Integer, Long, Integer>> createTypeSerializer(StreamExecutionEnvironment env) {
+		return new TupleTypeInfo<Tuple3<Integer, Long, Integer>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO)
+			.createSerializer(env.getConfig());
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleTestBase.java
new file mode 100644
index 0000000..a42b151
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleTestBase.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+
+import org.junit.BeforeClass;
+
+import java.util.Random;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+
+/**
+ * Base Test Class for KafkaShuffle.
+ */
+public class KafkaShuffleTestBase extends KafkaConsumerTestBase {
+	static final long INIT_TIMESTAMP = System.currentTimeMillis();
+
+	@BeforeClass
+	public static void prepare() throws Exception {
+		KafkaProducerTestBase.prepare();
+		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
+	}
+
+	static class KafkaSourceFunction extends RichParallelSourceFunction<Tuple3<Integer, Long, Integer>> {
+		private volatile boolean running = true;
+		private final int numElementsPerProducer;
+		private final boolean unBounded;
+
+		KafkaSourceFunction(int numElementsPerProducer) {
+			this.numElementsPerProducer = numElementsPerProducer;
+			this.unBounded = true;
+		}
+
+		KafkaSourceFunction(int numElementsPerProducer, boolean unBounded) {
+			this.numElementsPerProducer = numElementsPerProducer;
+			this.unBounded = unBounded;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple3<Integer, Long, Integer>> ctx) throws Exception{
+			long timestamp = INIT_TIMESTAMP;
+			int sourceInstanceId = getRuntimeContext().getIndexOfThisSubtask();
+			for (int i = 0; i < numElementsPerProducer && running; i++) {
+				ctx.collect(new Tuple3<>(i, timestamp++, sourceInstanceId));
+			}
+
+			while (running && unBounded) {
+				Thread.sleep(100);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	static KeyedStream<Tuple3<Integer, Long, Integer>, Tuple> createKafkaShuffle(
+			StreamExecutionEnvironment env,
+			String topic,
+			int numElementsPerProducer,
+			int producerParallelism,
+			TimeCharacteristic timeCharacteristic,
+			int numberOfPartitions) {
+		return createKafkaShuffle(
+			env,
+			topic,
+			numElementsPerProducer,
+			producerParallelism,
+			timeCharacteristic,
+			numberOfPartitions,
+			false);
+	}
+
+	static KeyedStream<Tuple3<Integer, Long, Integer>, Tuple> createKafkaShuffle(
+			StreamExecutionEnvironment env,
+			String topic,
+			int numElementsPerProducer,
+			int producerParallelism,
+			TimeCharacteristic timeCharacteristic,
+			int numberOfPartitions,
+			boolean randomness) {
+		DataStream<Tuple3<Integer, Long, Integer>> source =
+			env.addSource(new KafkaSourceFunction(numElementsPerProducer)).setParallelism(producerParallelism);
+		DataStream<Tuple3<Integer, Long, Integer>> input = (timeCharacteristic == EventTime) ?
+			source.assignTimestampsAndWatermarks(new PunctuatedExtractor(randomness)).setParallelism(producerParallelism) : source;
+
+		return FlinkKafkaShuffle.persistentKeyBy(
+			input,
+			topic,
+			producerParallelism,
+			numberOfPartitions,
+			kafkaServer.getStandardProperties(),
+			0);
+	}
+
+	static class PunctuatedExtractor implements AssignerWithPunctuatedWatermarks<Tuple3<Integer, Long, Integer>> {
+		private static final long serialVersionUID = 1L;
+		boolean randomness;
+		Random rnd = new Random(123);
+
+		PunctuatedExtractor() {
+			randomness = false;
+		}
+
+		PunctuatedExtractor(boolean randomness) {
+			this.randomness = randomness;
+		}
+
+		@Override
+		public long extractTimestamp(Tuple3<Integer, Long, Integer> element, long previousTimestamp) {
+			return element.f1;
+		}
+
+		@Override
+		public Watermark checkAndGetNextWatermark(Tuple3<Integer, Long, Integer> lastElement, long extractedTimestamp) {
+			long randomValue = randomness ? rnd.nextInt(10) : 0;
+			return new Watermark(extractedTimestamp + randomValue);
+		}
+	}
+
+	static class PartitionValidator
+			extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>> {
+		private final KeySelector<Tuple3<Integer, Long, Integer>, Tuple> keySelector;
+		private final int numberOfPartitions;
+		private final String topic;
+
+		private int previousPartition;
+
+		PartitionValidator(
+				KeySelector<Tuple3<Integer, Long, Integer>, Tuple> keySelector,
+				int numberOfPartitions,
+				String topic) {
+			this.keySelector = keySelector;
+			this.numberOfPartitions = numberOfPartitions;
+			this.topic = topic;
+			this.previousPartition = -1;
+		}
+
+		@Override
+		public void processElement(
+				Tuple3<Integer, Long, Integer> in,
+				Context ctx,
+				Collector<Tuple3<Integer, Long, Integer>> out) throws Exception {
+			int expectedPartition = KeyGroupRangeAssignment
+				.assignKeyToParallelOperator(keySelector.getKey(in), numberOfPartitions, numberOfPartitions);
+			int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+			KafkaTopicPartition partition = new KafkaTopicPartition(topic, expectedPartition);
+
+			// This is how Kafka assign partition to subTask;
+			boolean rightAssignment =
+				KafkaTopicPartitionAssigner.assign(partition, numberOfPartitions) == indexOfThisSubtask;
+			boolean samePartition = (previousPartition == expectedPartition) || (previousPartition == -1);
+			previousPartition = expectedPartition;
+
+			if (!(rightAssignment && samePartition)) {
+				throw new Exception("Error: Kafka partition assignment error ");
+			}
+			out.collect(in);
+		}
+	}
+
+	static class WatermarkValidator
+			extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>> {
+		private long previousWatermark = Long.MIN_VALUE;   // initial watermark get from timeService
+
+		@Override
+		public void processElement(
+				Tuple3<Integer, Long, Integer> in,
+				Context ctx,
+				Collector<Tuple3<Integer, Long, Integer>> out) throws Exception {
+
+			long watermark = ctx.timerService().currentWatermark();
+
+			// Notice that the timerService might not be updated if no new watermark has been emitted, hence equivalent
+			// watermark is allowed, strictly incremental check is done when fetching watermark from KafkaShuffleFetcher.
+			if (watermark < previousWatermark) {
+				throw new Exception(
+					"Error: watermark should always increase. current watermark : previous watermark ["
+						+ watermark + " : " + previousWatermark + "]");
+			}
+			previousWatermark = watermark;
+
+			out.collect(in);
+		}
+	}
+
+	static class ElementCountNoLessThanValidator
+			implements MapFunction<Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>> {
+		private final int totalCount;
+		private int counter = 0;
+
+		ElementCountNoLessThanValidator(int totalCount) {
+			this.totalCount = totalCount;
+		}
+
+		@Override
+		public Tuple3<Integer, Long, Integer> map(Tuple3<Integer, Long, Integer> element) throws Exception {
+			counter++;
+
+			if (counter == totalCount) {
+				throw new SuccessException();
+			}
+
+			return element;
+		}
+	}
+
+	static class ElementCountNoMoreThanValidator
+			implements MapFunction<Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>> {
+		private final int totalCount;
+		private int counter = 0;
+
+		ElementCountNoMoreThanValidator(int totalCount) {
+			this.totalCount = totalCount;
+		}
+
+		@Override
+		public Tuple3<Integer, Long, Integer> map(Tuple3<Integer, Long, Integer> element) throws Exception {
+			counter++;
+
+			if (counter > totalCount) {
+				throw new Exception("Error: number of elements more than expected");
+			}
+
+			return element;
+		}
+	}
+
+	String topic(String prefix, TimeCharacteristic timeCharacteristic) {
+		return prefix + "_" + timeCharacteristic;
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-kafka/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..863665c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/resources/log4j2-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
+logger.kafka.name = kafka
+logger.kafka.level = OFF
+logger.kafka2.name = state.change
+logger.kafka2.level = OFF
+
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = OFF
+logger.I0Itec.name = org.I0Itec
+logger.I0Itec.level = OFF