You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/16 18:09:53 UTC

[3/5] flink git commit: [hotfix] [Kafka Consumer] Clean up some code confusion and style in the Fetchers for Kafka 0.9/0.10

[hotfix] [Kafka Consumer] Clean up some code confusion and style in the Fetchers for Kafka 0.9/0.10


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa1864c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa1864c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa1864c7

Branch: refs/heads/master
Commit: fa1864c7a6eadea55eb2d7e8fd2b72e043841671
Parents: 611412c
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 9 17:58:54 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 16 19:08:07 2016 +0100

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |  6 ++
 .../kafka/internal/Kafka010Fetcher.java         | 39 +++++--------
 .../connectors/kafka/Kafka010FetcherTest.java   |  1 -
 .../kafka/internals/SimpleConsumerThread.java   |  2 +-
 .../kafka/internal/Kafka09Fetcher.java          | 25 +++++---
 .../kafka/internals/AbstractFetcher.java        | 60 ++++++++++++++------
 .../AbstractFetcherTimestampsTest.java          | 53 +++++++++--------
 7 files changed, 107 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
index 8108afc..04019f8 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
@@ -48,6 +48,12 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
 			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.kafka</groupId>
+					<artifactId>kafka_${scala.binary.version}</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<!-- Add Kafka 0.10.x as a dependency -->

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 4a1f5f6..024cd38 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -38,6 +38,9 @@ import java.util.Properties;
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API.
  * 
+ * <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally
+ * takes the KafkaRecord-attached timestamp and attaches it to the Flink records.
+ * 
  * @param <T> The type of elements produced by the fetcher.
  */
 public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
@@ -76,37 +79,23 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 	}
 
 	@Override
-	protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) {
-		consumer.assign(topicPartitions);
-	}
+	protected void emitRecord(
+			T record,
+			KafkaTopicPartitionState<TopicPartition> partition,
+			long offset,
+			ConsumerRecord<?, ?> consumerRecord) throws Exception {
 
-	@Override
-	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
-		// get timestamp from provided ConsumerRecord (only possible with kafka 0.10.x)
-		super.emitRecord(record, partition, offset, consumerRecord.timestamp());
+		// we attach the Kafka 0.10 timestamp here
+		emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp());
 	}
 
 	/**
-	 * Emit record Kafka-timestamp aware.
+	 * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
+	 * changing the List in the signature to a Collection.
 	 */
 	@Override
-	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long timestamp) throws Exception {
-		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
-			// fast path logic, in case there are no watermarks
-
-			// emit the record, using the checkpoint lock to guarantee
-			// atomicity of record emission and offset state update
-			synchronized (checkpointLock) {
-				sourceContext.collectWithTimestamp(record, timestamp);
-				partitionState.setOffset(offset);
-			}
-		}
-		else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-			emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
-		}
-		else {
-			emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
-		}
+	protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) {
+		consumer.assign(topicPartitions);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 718db48..037d25b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -114,7 +114,6 @@ public class Kafka010FetcherTest {
         SourceContext<String> sourceContext = mock(SourceContext.class);
         List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
         KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-        StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
 
         final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
                 sourceContext,

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
index 1302348..35e491a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -376,7 +376,7 @@ class SimpleConsumerThread<T> extends Thread {
 								continue partitionsLoop;
 							}
 							
-							owner.emitRecord(value, currentPartition, offset, Long.MIN_VALUE);
+							owner.emitRecord(value, currentPartition, offset);
 						}
 						else {
 							// no longer running

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index a8c0397..acdcb61 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -201,7 +201,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		try {
 			assignPartitionsToConsumer(consumer, convertKafkaPartitions(subscribedPartitions()));
 
-
 			if (useMetrics) {
 				final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
 				addOffsetStateGauge(kafkaMetricGroup);
@@ -306,14 +305,22 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		}
 	}
 
-	// Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method.
-	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
-		emitRecord(record, partition, offset, Long.MIN_VALUE);
+	// ------------------------------------------------------------------------
+	//  The below methods are overridden in the 0.10 fetcher, which otherwise
+	//   reuses most of the 0.9 fetcher behavior
+	// ------------------------------------------------------------------------
+
+	protected void emitRecord(
+			T record,
+			KafkaTopicPartitionState<TopicPartition> partition,
+			long offset,
+			@SuppressWarnings("UnusedParameters") ConsumerRecord<?, ?> consumerRecord) throws Exception {
+
+		// the 0.9 Fetcher does not try to extract a timestamp
+		emitRecord(record, partition, offset);
 	}
-	/**
-	 * Protected method to make the partition assignment pluggable, for different Kafka versions.
-	 */
-	protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) {
+
+	protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) {
 		consumer.assign(topicPartitions);
 	}
 
@@ -322,7 +329,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 	}
 
 	// ------------------------------------------------------------------------
-	//  Kafka 0.9 specific fetcher behavior
+	//  Implement Methods of the AbstractFetcher
 	// ------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 3350b06..cf39606 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -205,32 +205,60 @@ public abstract class AbstractFetcher<T, KPH> {
 			}
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  emitting records
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Emits a record without attaching an existing timestamp to it.
+	 * 
 	 * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
 	 * That makes the fast path efficient, the extended paths are called as separate methods.
+	 * 
 	 * @param record The record to emit
 	 * @param partitionState The state of the Kafka partition from which the record was fetched
 	 * @param offset The offset of the record
-	 * @param timestamp The record's event-timestamp
 	 */
-	protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
+	protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception {
 		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
 			// fast path logic, in case there are no watermarks
 
 			// emit the record, using the checkpoint lock to guarantee
 			// atomicity of record emission and offset state update
 			synchronized (checkpointLock) {
-				if(timestamp != Long.MIN_VALUE) {
-					// this case is true for Kafka 0.10
-					sourceContext.collectWithTimestamp(record, timestamp);
-				} else {
-					sourceContext.collect(record);
-				}
+				sourceContext.collect(record);
+				partitionState.setOffset(offset);
+			}
+		}
+		else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+			emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
+		}
+		else {
+			emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
+		}
+	}
+
+	/**
+	 * Emits a record attaching a timestamp to it.
+	 *
+	 * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
+	 * That makes the fast path efficient, the extended paths are called as separate methods.
+	 *
+	 * @param record The record to emit
+	 * @param partitionState The state of the Kafka partition from which the record was fetched
+	 * @param offset The offset of the record
+	 */
+	protected void emitRecordWithTimestamp(
+			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
+
+		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+			// fast path logic, in case there are no watermarks generated in the fetcher
+
+			// emit the record, using the checkpoint lock to guarantee
+			// atomicity of record emission and offset state update
+			synchronized (checkpointLock) {
+				sourceContext.collectWithTimestamp(record, timestamp);
 				partitionState.setOffset(offset);
 			}
 		}
@@ -285,14 +313,14 @@ public abstract class AbstractFetcher<T, KPH> {
 		// from the punctuated extractor
 		final long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
 		final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
-			
+
 		// emit the record with timestamp, using the usual checkpoint lock to guarantee
 		// atomicity of record emission and offset state update 
 		synchronized (checkpointLock) {
 			sourceContext.collectWithTimestamp(record, timestamp);
 			partitionState.setOffset(offset);
 		}
-		
+
 		// if we also have a new per-partition watermark, check if that is also a
 		// new cross-partition watermark
 		if (newWatermark != null) {
@@ -306,7 +334,7 @@ public abstract class AbstractFetcher<T, KPH> {
 	private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
 		if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
 			long newMin = Long.MAX_VALUE;
-			
+
 			for (KafkaTopicPartitionState<?> state : allPartitions) {
 				@SuppressWarnings("unchecked")
 				final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
@@ -314,7 +342,7 @@ public abstract class AbstractFetcher<T, KPH> {
 				
 				newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
 			}
-			
+
 			// double-check locking pattern
 			if (newMin > maxWatermarkSoFar) {
 				synchronized (checkpointLock) {
@@ -416,7 +444,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		// add current offsets to gage
 		MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
 		MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
-		for(KafkaTopicPartitionState ktp: subscribedPartitions()){
+		for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
 			currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
 			committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
 		}
@@ -435,10 +463,10 @@ public abstract class AbstractFetcher<T, KPH> {
 	 */
 	private static class OffsetGauge implements Gauge<Long> {
 
-		private final KafkaTopicPartitionState ktp;
+		private final KafkaTopicPartitionState<?> ktp;
 		private final OffsetGaugeType gaugeType;
 
-		public OffsetGauge(KafkaTopicPartitionState ktp, OffsetGaugeType gaugeType) {
+		public OffsetGauge(KafkaTopicPartitionState<?> ktp, OffsetGaugeType gaugeType) {
 			this.ktp = ktp;
 			this.gaugeType = gaugeType;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 5801c24..0b3507a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 
@@ -67,22 +66,22 @@ public class AbstractFetcherTimestampsTest {
 		// elements generate a watermark if the timestamp is a multiple of three
 		
 		// elements for partition 1
-		fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
-		fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
-		fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
+		fetcher.emitRecord(1L, part1, 1L);
+		fetcher.emitRecord(2L, part1, 2L);
+		fetcher.emitRecord(3L, part1, 3L);
 		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
 		assertFalse(sourceContext.hasWatermark());
 
 		// elements for partition 2
-		fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
+		fetcher.emitRecord(12L, part2, 1L);
 		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
 		assertFalse(sourceContext.hasWatermark());
 
 		// elements for partition 3
-		fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
-		fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
+		fetcher.emitRecord(101L, part3, 1L);
+		fetcher.emitRecord(102L, part3, 2L);
 		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
 		
@@ -91,25 +90,25 @@ public class AbstractFetcherTimestampsTest {
 		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
 		
 		// advance partition 3
-		fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
-		fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
-		fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
+		fetcher.emitRecord(1003L, part3, 3L);
+		fetcher.emitRecord(1004L, part3, 4L);
+		fetcher.emitRecord(1005L, part3, 5L);
 		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
 
 		// advance partition 1 beyond partition 2 - this bumps the watermark
-		fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
+		fetcher.emitRecord(30L, part1, 4L);
 		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
 		assertTrue(sourceContext.hasWatermark());
 		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
 
 		// advance partition 2 again - this bumps the watermark
-		fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
+		fetcher.emitRecord(13L, part2, 2L);
 		assertFalse(sourceContext.hasWatermark());
-		fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
+		fetcher.emitRecord(14L, part2, 3L);
 		assertFalse(sourceContext.hasWatermark());
-		fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
+		fetcher.emitRecord(15L, part2, 3L);
 		assertTrue(sourceContext.hasWatermark());
 		assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
 	}
@@ -141,20 +140,20 @@ public class AbstractFetcherTimestampsTest {
 		// elements generate a watermark if the timestamp is a multiple of three
 
 		// elements for partition 1
-		fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
-		fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
-		fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
+		fetcher.emitRecord(1L, part1, 1L);
+		fetcher.emitRecord(2L, part1, 2L);
+		fetcher.emitRecord(3L, part1, 3L);
 		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
 
 		// elements for partition 2
-		fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
+		fetcher.emitRecord(12L, part2, 1L);
 		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
 
 		// elements for partition 3
-		fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
-		fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
+		fetcher.emitRecord(101L, part3, 1L);
+		fetcher.emitRecord(102L, part3, 2L);
 		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
 
@@ -164,14 +163,14 @@ public class AbstractFetcherTimestampsTest {
 		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
 
 		// advance partition 3
-		fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
-		fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
-		fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
+		fetcher.emitRecord(1003L, part3, 3L);
+		fetcher.emitRecord(1004L, part3, 4L);
+		fetcher.emitRecord(1005L, part3, 5L);
 		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
 
 		// advance partition 1 beyond partition 2 - this bumps the watermark
-		fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
+		fetcher.emitRecord(30L, part1, 4L);
 		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
 
@@ -181,9 +180,9 @@ public class AbstractFetcherTimestampsTest {
 		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
 
 		// advance partition 2 again - this bumps the watermark
-		fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
-		fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
-		fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
+		fetcher.emitRecord(13L, part2, 2L);
+		fetcher.emitRecord(14L, part2, 3L);
+		fetcher.emitRecord(15L, part2, 3L);
 
 		processingTimeService.setCurrentTime(30);
 		// this blocks until the periodic thread emitted the watermark