You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/09 06:06:48 UTC

[1/2] flink git commit: [FLINK-3679] [kafka] Allow Kafka consumer to skip corrupted messages

Repository: flink
Updated Branches:
  refs/heads/master adbf846f2 -> c39ad31f3


[FLINK-3679] [kafka] Allow Kafka consumer to skip corrupted messages


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afb4c5e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afb4c5e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afb4c5e0

Branch: refs/heads/master
Commit: afb4c5e02c513a82d2ad7f7816065fdd93665e0e
Parents: adbf846
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Mar 2 13:33:13 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Mar 9 14:05:37 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |  4 +
 .../connectors/kafka/Kafka09FetcherTest.java    | 84 ++++++++++++++++++++
 .../kafka/internals/AbstractFetcher.java        |  4 +
 3 files changed, 92 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 0f700ab..331c9c7 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -146,6 +146,10 @@ The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into
 `DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)`
 method gets called for each Kafka message, passing the value from Kafka.
 
+There are two possible design choices when the `DeserializationSchema` encounters a corrupted message. It can
+either throw an `IOException` which causes the pipeline to be restarted, or it can return `null` where the Flink
+Kafka consumer will silently skip the corrupted message.
+
 It is usually helpful to start from the `AbstractDeserializationSchema`, which takes care of describing the
 produced Java/Scala type to Flink's type system. Users that implement a vanilla `DeserializationSchema` need
 to implement the `getProducedType(...)` method themselves.

http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 49144e6..61a8855 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -29,6 +31,7 @@ import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.CollectingSourceContext;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -49,6 +52,8 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -419,6 +424,85 @@ public class Kafka09FetcherTest {
 		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
 	}
 
+	@Test
+	public void testSkipCorruptedMessage() throws Exception {
+
+		// ----- some test data -----
+
+		final String topic = "test-topic";
+		final int partition = 3;
+		final byte[] payload = new byte[] {1, 2, 3, 4};
+
+		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+			new ConsumerRecord<>(topic, partition, 15, payload, payload),
+			new ConsumerRecord<>(topic, partition, 16, payload, payload),
+			new ConsumerRecord<>(topic, partition, 17, payload, "end".getBytes()));
+
+		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
+		data.put(new TopicPartition(topic, partition), records);
+
+		final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
+
+		// ----- the test consumer -----
+
+		final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+				return consumerRecords;
+			}
+		});
+
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- build a fetcher -----
+
+		ArrayList<String> results = new ArrayList<>();
+		SourceContext<String> sourceContext = new CollectingSourceContext<>(results, results);
+		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+			Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchema<String>() {
+
+			@Override
+			public String deserialize(byte[] messageKey, byte[] message,
+									  String topic, int partition, long offset) throws IOException {
+				return offset == 15 ? null : new String(message);
+			}
+
+			@Override
+			public boolean isEndOfStream(String nextElement) {
+				return "end".equals(nextElement);
+			}
+
+			@Override
+			public TypeInformation<String> getProducedType() {
+				return BasicTypeInfo.STRING_TYPE_INFO;
+			}
+		};
+
+		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+			sourceContext,
+			partitionsWithInitialOffsets,
+			null, /* periodic watermark extractor */
+			null, /* punctuated watermark extractor */
+			new TestProcessingTimeService(),
+			10, /* watermark interval */
+			this.getClass().getClassLoader(),
+			true, /* checkpointing */
+			"task_name",
+			new UnregisteredMetricsGroup(),
+			schema,
+			new Properties(),
+			0L,
+			false);
+
+
+		// ----- run the fetcher -----
+
+		fetcher.runFetchLoop();
+		assertEquals(1, results.size());
+	}
+
 	// ------------------------------------------------------------------------
 	//  test utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index e021881..76ce1a0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -213,6 +213,10 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * @param offset The offset of the record
 	 */
 	protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception {
+		if (record == null) {
+			return;
+		}
+
 		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
 			// fast path logic, in case there are no watermarks
 


[2/2] flink git commit: [FLINK-3679] [kafka] Improve null record handling for FlinkKafkaConsumer

Posted by tz...@apache.org.
[FLINK-3679] [kafka] Improve null record handling for FlinkKafkaConsumer

This commit generally improves null record handling by:
 - also update the offset in state holders if record is null
 - move null record related tests to AbstractFetcherTest, so that
   behaviour is tested for all fetcher implementations
 - let the docs be more informative of the behaviour of the consumer
   when corrupted messages are encountered.

This closes #3314.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c39ad31f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c39ad31f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c39ad31f

Branch: refs/heads/master
Commit: c39ad31f3c321a1803abc97b2ba3074561d9af6e
Parents: afb4c5e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Mar 9 01:28:08 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Mar 9 14:05:38 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |  14 +-
 .../connectors/kafka/Kafka09FetcherTest.java    |  84 ----
 .../kafka/internals/AbstractFetcher.java        |  57 ++-
 .../kafka/internals/AbstractFetcherTest.java    | 461 +++++++++++++++++++
 .../AbstractFetcherTimestampsTest.java          | 335 --------------
 5 files changed, 503 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c39ad31f/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 331c9c7..06e40b2 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -146,10 +146,6 @@ The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into
 `DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)`
 method gets called for each Kafka message, passing the value from Kafka.
 
-There are two possible design choices when the `DeserializationSchema` encounters a corrupted message. It can
-either throw an `IOException` which causes the pipeline to be restarted, or it can return `null` where the Flink
-Kafka consumer will silently skip the corrupted message.
-
 It is usually helpful to start from the `AbstractDeserializationSchema`, which takes care of describing the
 produced Java/Scala type to Flink's type system. Users that implement a vanilla `DeserializationSchema` need
 to implement the `getProducedType(...)` method themselves.
@@ -167,6 +163,16 @@ For convenience, Flink provides the following schemas:
     into an ObjectNode object, from which fields can be accessed using objectNode.get("field").as(Int/String/...)().
     The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
     an optional "metadata" field that exposes the offset/partition/topic for this message.
+    
+When encountering a corrupted message that cannot be deserialized for any reason, there
+are two options - either throwing an exception from the `deserialize(...)` method
+which will cause the job to fail and be restarted, or returning `null` to allow
+the Flink Kafka consumer to silently skip the corrupted message. Note that
+due to the consumer's fault tolerance (see below sections for more details),
+failing the job on the corrupted message will let the consumer attempt
+to deserialize the message again. Therefore, if deserialization still fails, the
+consumer will fall into a non-stop restart and fail loop on that corrupted
+message.
 
 ### Kafka Consumers Start Position Configuration
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c39ad31f/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 61a8855..49144e6 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -31,7 +29,6 @@ import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.util.CollectingSourceContext;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -52,8 +49,6 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -424,85 +419,6 @@ public class Kafka09FetcherTest {
 		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
 	}
 
-	@Test
-	public void testSkipCorruptedMessage() throws Exception {
-
-		// ----- some test data -----
-
-		final String topic = "test-topic";
-		final int partition = 3;
-		final byte[] payload = new byte[] {1, 2, 3, 4};
-
-		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
-			new ConsumerRecord<>(topic, partition, 15, payload, payload),
-			new ConsumerRecord<>(topic, partition, 16, payload, payload),
-			new ConsumerRecord<>(topic, partition, 17, payload, "end".getBytes()));
-
-		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
-		data.put(new TopicPartition(topic, partition), records);
-
-		final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
-
-		// ----- the test consumer -----
-
-		final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-			@Override
-			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
-				return consumerRecords;
-			}
-		});
-
-		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-		// ----- build a fetcher -----
-
-		ArrayList<String> results = new ArrayList<>();
-		SourceContext<String> sourceContext = new CollectingSourceContext<>(results, results);
-		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
-			Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
-		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchema<String>() {
-
-			@Override
-			public String deserialize(byte[] messageKey, byte[] message,
-									  String topic, int partition, long offset) throws IOException {
-				return offset == 15 ? null : new String(message);
-			}
-
-			@Override
-			public boolean isEndOfStream(String nextElement) {
-				return "end".equals(nextElement);
-			}
-
-			@Override
-			public TypeInformation<String> getProducedType() {
-				return BasicTypeInfo.STRING_TYPE_INFO;
-			}
-		};
-
-		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
-			sourceContext,
-			partitionsWithInitialOffsets,
-			null, /* periodic watermark extractor */
-			null, /* punctuated watermark extractor */
-			new TestProcessingTimeService(),
-			10, /* watermark interval */
-			this.getClass().getClassLoader(),
-			true, /* checkpointing */
-			"task_name",
-			new UnregisteredMetricsGroup(),
-			schema,
-			new Properties(),
-			0L,
-			false);
-
-
-		// ----- run the fetcher -----
-
-		fetcher.runFetchLoop();
-		assertEquals(1, results.size());
-	}
-
 	// ------------------------------------------------------------------------
 	//  test utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c39ad31f/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 76ce1a0..b8ac980 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -213,26 +213,28 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * @param offset The offset of the record
 	 */
 	protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception {
-		if (record == null) {
-			return;
-		}
 
-		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
-			// fast path logic, in case there are no watermarks
+		if (record != null) {
+			if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+				// fast path logic, in case there are no watermarks
 
-			// emit the record, using the checkpoint lock to guarantee
-			// atomicity of record emission and offset state update
+				// emit the record, using the checkpoint lock to guarantee
+				// atomicity of record emission and offset state update
+				synchronized (checkpointLock) {
+					sourceContext.collect(record);
+					partitionState.setOffset(offset);
+				}
+			} else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+				emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
+			} else {
+				emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
+			}
+		} else {
+			// if the record is null, simply just update the offset state for partition
 			synchronized (checkpointLock) {
-				sourceContext.collect(record);
 				partitionState.setOffset(offset);
 			}
 		}
-		else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-			emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
-		}
-		else {
-			emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
-		}
 	}
 
 	/**
@@ -248,22 +250,27 @@ public abstract class AbstractFetcher<T, KPH> {
 	protected void emitRecordWithTimestamp(
 			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
 
-		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
-			// fast path logic, in case there are no watermarks generated in the fetcher
+		if (record != null) {
+			if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+				// fast path logic, in case there are no watermarks generated in the fetcher
 
-			// emit the record, using the checkpoint lock to guarantee
-			// atomicity of record emission and offset state update
+				// emit the record, using the checkpoint lock to guarantee
+				// atomicity of record emission and offset state update
+				synchronized (checkpointLock) {
+					sourceContext.collectWithTimestamp(record, timestamp);
+					partitionState.setOffset(offset);
+				}
+			} else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+				emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
+			} else {
+				emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
+			}
+		} else {
+			// if the record is null, simply just update the offset state for partition
 			synchronized (checkpointLock) {
-				sourceContext.collectWithTimestamp(record, timestamp);
 				partitionState.setOffset(offset);
 			}
 		}
-		else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-			emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
-		}
-		else {
-			emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
-		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c39ad31f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
new file mode 100644
index 0000000..c1a64c4
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+@SuppressWarnings("serial")
+public class AbstractFetcherTest {
+
+	// ------------------------------------------------------------------------
+	//   Record emitting tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testSkipCorruptedRecord() throws Exception {
+		final String testTopic = "test topic name";
+		Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+		originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+		TestFetcher<Long> fetcher = new TestFetcher<>(
+			sourceContext,
+			originalPartitions,
+			null, /* periodic watermark assigner */
+			null, /* punctuated watermark assigner */
+			mock(TestProcessingTimeService.class),
+			0);
+
+		final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates()[0];
+
+		fetcher.emitRecord(1L, partitionStateHolder, 1L);
+		fetcher.emitRecord(2L, partitionStateHolder, 2L);
+		assertEquals(2L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(2L, partitionStateHolder.getOffset());
+
+		// emit null record
+		fetcher.emitRecord(null, partitionStateHolder, 3L);
+		assertEquals(2L, sourceContext.getLatestElement().getValue().longValue()); // the null record should be skipped
+		assertEquals(3L, partitionStateHolder.getOffset()); // the offset in state still should have advanced
+	}
+
+	@Test
+	public void testSkipCorruptedRecordWithPunctuatedWatermarks() throws Exception {
+		final String testTopic = "test topic name";
+		Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+		originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+		TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
+
+		TestFetcher<Long> fetcher = new TestFetcher<>(
+			sourceContext,
+			originalPartitions,
+			null, /* periodic watermark assigner */
+			new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()), /* punctuated watermark assigner */
+			processingTimeProvider,
+			0);
+
+		final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates()[0];
+
+		// elements generate a watermark if the timestamp is a multiple of three
+		fetcher.emitRecord(1L, partitionStateHolder, 1L);
+		fetcher.emitRecord(2L, partitionStateHolder, 2L);
+		fetcher.emitRecord(3L, partitionStateHolder, 3L);
+		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+		assertTrue(sourceContext.hasWatermark());
+		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+		assertEquals(3L, partitionStateHolder.getOffset());
+
+		// emit null record
+		fetcher.emitRecord(null, partitionStateHolder, 4L);
+
+		// no elements or watermarks should have been collected
+		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+		assertFalse(sourceContext.hasWatermark());
+		// the offset in state still should have advanced
+		assertEquals(4L, partitionStateHolder.getOffset());
+	}
+
+	@Test
+	public void testSkipCorruptedRecordWithPeriodicWatermarks() throws Exception {
+		final String testTopic = "test topic name";
+		Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+		originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+		TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
+
+		TestFetcher<Long> fetcher = new TestFetcher<>(
+			sourceContext,
+			originalPartitions,
+			new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), /* periodic watermark assigner */
+			null, /* punctuated watermark assigner */
+			processingTimeProvider,
+			10);
+
+		final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates()[0];
+
+		// elements generate a watermark if the timestamp is a multiple of three
+		fetcher.emitRecord(1L, partitionStateHolder, 1L);
+		fetcher.emitRecord(2L, partitionStateHolder, 2L);
+		fetcher.emitRecord(3L, partitionStateHolder, 3L);
+		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+		assertEquals(3L, partitionStateHolder.getOffset());
+
+		// advance timer for watermark emitting
+		processingTimeProvider.setCurrentTime(10L);
+		assertTrue(sourceContext.hasWatermark());
+		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+		// emit null record
+		fetcher.emitRecord(null, partitionStateHolder, 4L);
+
+		// no elements should have been collected
+		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+		// the offset in state still should have advanced
+		assertEquals(4L, partitionStateHolder.getOffset());
+
+		// no watermarks should be collected
+		processingTimeProvider.setCurrentTime(20L);
+		assertFalse(sourceContext.hasWatermark());
+	}
+
+	// ------------------------------------------------------------------------
+	//   Timestamps & watermarks tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testPunctuatedWatermarks() throws Exception {
+		final String testTopic = "test topic name";
+		Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+		originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+		originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+		originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+		TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
+
+		TestFetcher<Long> fetcher = new TestFetcher<>(
+				sourceContext,
+				originalPartitions,
+				null, /* periodic watermark assigner */
+				new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()),
+				processingTimeProvider,
+				0);
+
+		final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates()[0];
+		final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates()[1];
+		final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2];
+
+		// elements generate a watermark if the timestamp is a multiple of three
+		
+		// elements for partition 1
+		fetcher.emitRecord(1L, part1, 1L);
+		fetcher.emitRecord(2L, part1, 2L);
+		fetcher.emitRecord(3L, part1, 3L);
+		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+		assertFalse(sourceContext.hasWatermark());
+
+		// elements for partition 2
+		fetcher.emitRecord(12L, part2, 1L);
+		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+		assertFalse(sourceContext.hasWatermark());
+
+		// elements for partition 3
+		fetcher.emitRecord(101L, part3, 1L);
+		fetcher.emitRecord(102L, part3, 2L);
+		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+		
+		// now, we should have a watermark
+		assertTrue(sourceContext.hasWatermark());
+		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+		
+		// advance partition 3
+		fetcher.emitRecord(1003L, part3, 3L);
+		fetcher.emitRecord(1004L, part3, 4L);
+		fetcher.emitRecord(1005L, part3, 5L);
+		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+		// advance partition 1 beyond partition 2 - this bumps the watermark
+		fetcher.emitRecord(30L, part1, 4L);
+		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+		assertTrue(sourceContext.hasWatermark());
+		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+		// advance partition 2 again - this bumps the watermark
+		fetcher.emitRecord(13L, part2, 2L);
+		assertFalse(sourceContext.hasWatermark());
+		fetcher.emitRecord(14L, part2, 3L);
+		assertFalse(sourceContext.hasWatermark());
+		fetcher.emitRecord(15L, part2, 3L);
+		assertTrue(sourceContext.hasWatermark());
+		assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
+	}
+	
+	@Test
+	public void testPeriodicWatermarks() throws Exception {
+		final String testTopic = "test topic name";
+		Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+		originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+		originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+		originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+
+		TestFetcher<Long> fetcher = new TestFetcher<>(
+				sourceContext,
+				originalPartitions,
+				new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
+				null, /* punctuated watermarks assigner*/
+				processingTimeService,
+				10);
+
+		final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates()[0];
+		final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates()[1];
+		final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2];
+
+		// elements generate a watermark if the timestamp is a multiple of three
+
+		// elements for partition 1
+		fetcher.emitRecord(1L, part1, 1L);
+		fetcher.emitRecord(2L, part1, 2L);
+		fetcher.emitRecord(3L, part1, 3L);
+		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+
+		// elements for partition 2
+		fetcher.emitRecord(12L, part2, 1L);
+		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+
+		// elements for partition 3
+		fetcher.emitRecord(101L, part3, 1L);
+		fetcher.emitRecord(102L, part3, 2L);
+		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+
+		processingTimeService.setCurrentTime(10);
+
+		// now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
+		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+		// advance partition 3
+		fetcher.emitRecord(1003L, part3, 3L);
+		fetcher.emitRecord(1004L, part3, 4L);
+		fetcher.emitRecord(1005L, part3, 5L);
+		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+		// advance partition 1 beyond partition 2 - this bumps the watermark
+		fetcher.emitRecord(30L, part1, 4L);
+		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+
+		processingTimeService.setCurrentTime(20);
+
+		// this blocks until the periodic thread emitted the watermark
+		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+		// advance partition 2 again - this bumps the watermark
+		fetcher.emitRecord(13L, part2, 2L);
+		fetcher.emitRecord(14L, part2, 3L);
+		fetcher.emitRecord(15L, part2, 3L);
+
+		processingTimeService.setCurrentTime(30);
+		// this blocks until the periodic thread emitted the watermark
+		long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
+		assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
+
+		protected TestFetcher(
+				SourceContext<T> sourceContext,
+				Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
+				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+				ProcessingTimeService processingTimeProvider,
+				long autoWatermarkInterval) throws Exception
+		{
+			super(
+				sourceContext,
+				assignedPartitionsWithStartOffsets,
+				watermarksPeriodic,
+				watermarksPunctuated,
+				processingTimeProvider,
+				autoWatermarkInterval,
+				TestFetcher.class.getClassLoader(),
+				false);
+		}
+
+		@Override
+		public void runFetchLoop() throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void cancel() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
+			return new Object();
+		}
+
+		@Override
+		public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestSourceContext<T> implements SourceContext<T> {
+
+		private final Object checkpointLock = new Object();
+		private final Object watermarkLock = new Object();
+
+		private volatile StreamRecord<T> latestElement;
+		private volatile Watermark currentWatermark;
+
+		@Override
+		public void collect(T element) {
+			this.latestElement = new StreamRecord<>(element);
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			this.latestElement = new StreamRecord<>(element, timestamp);
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			synchronized (watermarkLock) {
+				currentWatermark = mark;
+				watermarkLock.notifyAll();
+			}
+		}
+
+
+		@Override
+		public void markAsTemporarilyIdle() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return checkpointLock;
+		}
+
+		@Override
+		public void close() {}
+
+		public StreamRecord<T> getLatestElement() {
+			return latestElement;
+		}
+
+		public boolean hasWatermark() {
+			return currentWatermark != null;
+		}
+		
+		public Watermark getLatestWatermark() throws InterruptedException {
+			synchronized (watermarkLock) {
+				while (currentWatermark == null) {
+					watermarkLock.wait();
+				}
+				Watermark wm = currentWatermark;
+				currentWatermark = null;
+				return wm;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
+
+		private volatile long maxTimestamp = Long.MIN_VALUE;
+		
+		@Override
+		public long extractTimestamp(Long element, long previousElementTimestamp) {
+			maxTimestamp = Math.max(maxTimestamp, element);
+			return element;
+		}
+
+		@Nullable
+		@Override
+		public Watermark getCurrentWatermark() {
+			return new Watermark(maxTimestamp);
+		}
+	}
+
+	private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
+
+		@Override
+		public long extractTimestamp(Long element, long previousElementTimestamp) {
+			return element;
+		}
+
+		@Nullable
+		@Override
+		public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
+			return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c39ad31f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
deleted file mode 100644
index 17a375d..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.util.SerializedValue;
-
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-@SuppressWarnings("serial")
-public class AbstractFetcherTimestampsTest {
-	
-	@Test
-	public void testPunctuatedWatermarks() throws Exception {
-		final String testTopic = "test topic name";
-		Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
-		originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
-		originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
-		originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
-
-		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
-
-		TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
-
-		TestFetcher<Long> fetcher = new TestFetcher<>(
-				sourceContext,
-				originalPartitions,
-				null, /* periodic watermark assigner */
-				new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()),
-				processingTimeProvider,
-				0);
-
-		final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates()[0];
-		final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates()[1];
-		final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2];
-
-		// elements generate a watermark if the timestamp is a multiple of three
-		
-		// elements for partition 1
-		fetcher.emitRecord(1L, part1, 1L);
-		fetcher.emitRecord(2L, part1, 2L);
-		fetcher.emitRecord(3L, part1, 3L);
-		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
-		assertFalse(sourceContext.hasWatermark());
-
-		// elements for partition 2
-		fetcher.emitRecord(12L, part2, 1L);
-		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
-		assertFalse(sourceContext.hasWatermark());
-
-		// elements for partition 3
-		fetcher.emitRecord(101L, part3, 1L);
-		fetcher.emitRecord(102L, part3, 2L);
-		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
-		
-		// now, we should have a watermark
-		assertTrue(sourceContext.hasWatermark());
-		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
-		
-		// advance partition 3
-		fetcher.emitRecord(1003L, part3, 3L);
-		fetcher.emitRecord(1004L, part3, 4L);
-		fetcher.emitRecord(1005L, part3, 5L);
-		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
-
-		// advance partition 1 beyond partition 2 - this bumps the watermark
-		fetcher.emitRecord(30L, part1, 4L);
-		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
-		assertTrue(sourceContext.hasWatermark());
-		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
-
-		// advance partition 2 again - this bumps the watermark
-		fetcher.emitRecord(13L, part2, 2L);
-		assertFalse(sourceContext.hasWatermark());
-		fetcher.emitRecord(14L, part2, 3L);
-		assertFalse(sourceContext.hasWatermark());
-		fetcher.emitRecord(15L, part2, 3L);
-		assertTrue(sourceContext.hasWatermark());
-		assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
-	}
-	
-	@Test
-	public void testPeriodicWatermarks() throws Exception {
-		final String testTopic = "test topic name";
-		Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
-		originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
-		originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
-		originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
-
-		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
-
-		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
-
-		TestFetcher<Long> fetcher = new TestFetcher<>(
-				sourceContext,
-				originalPartitions,
-				new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
-				null, /* punctuated watermarks assigner*/
-				processingTimeService,
-				10);
-
-		final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates()[0];
-		final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates()[1];
-		final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2];
-
-		// elements generate a watermark if the timestamp is a multiple of three
-
-		// elements for partition 1
-		fetcher.emitRecord(1L, part1, 1L);
-		fetcher.emitRecord(2L, part1, 2L);
-		fetcher.emitRecord(3L, part1, 3L);
-		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
-
-		// elements for partition 2
-		fetcher.emitRecord(12L, part2, 1L);
-		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
-
-		// elements for partition 3
-		fetcher.emitRecord(101L, part3, 1L);
-		fetcher.emitRecord(102L, part3, 2L);
-		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
-
-		processingTimeService.setCurrentTime(10);
-
-		// now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
-		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
-
-		// advance partition 3
-		fetcher.emitRecord(1003L, part3, 3L);
-		fetcher.emitRecord(1004L, part3, 4L);
-		fetcher.emitRecord(1005L, part3, 5L);
-		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
-
-		// advance partition 1 beyond partition 2 - this bumps the watermark
-		fetcher.emitRecord(30L, part1, 4L);
-		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
-		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
-
-		processingTimeService.setCurrentTime(20);
-
-		// this blocks until the periodic thread emitted the watermark
-		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
-
-		// advance partition 2 again - this bumps the watermark
-		fetcher.emitRecord(13L, part2, 2L);
-		fetcher.emitRecord(14L, part2, 3L);
-		fetcher.emitRecord(15L, part2, 3L);
-
-		processingTimeService.setCurrentTime(30);
-		// this blocks until the periodic thread emitted the watermark
-		long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
-		assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Test mocks
-	// ------------------------------------------------------------------------
-
-	private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
-
-		protected TestFetcher(
-				SourceContext<T> sourceContext,
-				Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
-				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
-				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-				ProcessingTimeService processingTimeProvider,
-				long autoWatermarkInterval) throws Exception
-		{
-			super(
-				sourceContext,
-				assignedPartitionsWithStartOffsets,
-				watermarksPeriodic,
-				watermarksPunctuated,
-				processingTimeProvider,
-				autoWatermarkInterval,
-				TestFetcher.class.getClassLoader(),
-				false);
-		}
-
-		@Override
-		public void runFetchLoop() throws Exception {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public void cancel() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
-			return new Object();
-		}
-
-		@Override
-		public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
-			throw new UnsupportedOperationException();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static final class TestSourceContext<T> implements SourceContext<T> {
-
-		private final Object checkpointLock = new Object();
-		private final Object watermarkLock = new Object();
-
-		private volatile StreamRecord<T> latestElement;
-		private volatile Watermark currentWatermark;
-
-		@Override
-		public void collect(T element) {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public void collectWithTimestamp(T element, long timestamp) {
-			this.latestElement = new StreamRecord<>(element, timestamp);
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			synchronized (watermarkLock) {
-				currentWatermark = mark;
-				watermarkLock.notifyAll();
-			}
-		}
-
-
-		@Override
-		public void markAsTemporarilyIdle() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Object getCheckpointLock() {
-			return checkpointLock;
-		}
-
-		@Override
-		public void close() {}
-
-		public StreamRecord<T> getLatestElement() {
-			return latestElement;
-		}
-
-		public boolean hasWatermark() {
-			return currentWatermark != null;
-		}
-		
-		public Watermark getLatestWatermark() throws InterruptedException {
-			synchronized (watermarkLock) {
-				while (currentWatermark == null) {
-					watermarkLock.wait();
-				}
-				Watermark wm = currentWatermark;
-				currentWatermark = null;
-				return wm;
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
-
-		private volatile long maxTimestamp = Long.MIN_VALUE;
-		
-		@Override
-		public long extractTimestamp(Long element, long previousElementTimestamp) {
-			maxTimestamp = Math.max(maxTimestamp, element);
-			return element;
-		}
-
-		@Nullable
-		@Override
-		public Watermark getCurrentWatermark() {
-			return new Watermark(maxTimestamp);
-		}
-	}
-
-	private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
-
-		@Override
-		public long extractTimestamp(Long element, long previousElementTimestamp) {
-			return element;
-		}
-
-		@Nullable
-		@Override
-		public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
-			return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
-		}
-		
-	}
-}