You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/16 18:09:53 UTC
[3/5] flink git commit: [hotfix] [Kafka Consumer] Clean up some code
confusion and style in the Fetchers for Kafka 0.9/0.10
[hotfix] [Kafka Consumer] Clean up some code confusion and style in the Fetchers for Kafka 0.9/0.10
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa1864c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa1864c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa1864c7
Branch: refs/heads/master
Commit: fa1864c7a6eadea55eb2d7e8fd2b72e043841671
Parents: 611412c
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 9 17:58:54 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 16 19:08:07 2016 +0100
----------------------------------------------------------------------
.../flink-connector-kafka-0.10/pom.xml | 6 ++
.../kafka/internal/Kafka010Fetcher.java | 39 +++++--------
.../connectors/kafka/Kafka010FetcherTest.java | 1 -
.../kafka/internals/SimpleConsumerThread.java | 2 +-
.../kafka/internal/Kafka09Fetcher.java | 25 +++++---
.../kafka/internals/AbstractFetcher.java | 60 ++++++++++++++------
.../AbstractFetcherTimestampsTest.java | 53 +++++++++--------
7 files changed, 107 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
index 8108afc..04019f8 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
@@ -48,6 +48,12 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- Add Kafka 0.10.x as a dependency -->
http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 4a1f5f6..024cd38 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -38,6 +38,9 @@ import java.util.Properties;
/**
* A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API.
*
+ * <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally
+ * takes the KafkaRecord-attached timestamp and attaches it to the Flink records.
+ *
* @param <T> The type of elements produced by the fetcher.
*/
public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
@@ -76,37 +79,23 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
}
@Override
- protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) {
- consumer.assign(topicPartitions);
- }
+ protected void emitRecord(
+ T record,
+ KafkaTopicPartitionState<TopicPartition> partition,
+ long offset,
+ ConsumerRecord<?, ?> consumerRecord) throws Exception {
- @Override
- protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
- // get timestamp from provided ConsumerRecord (only possible with kafka 0.10.x)
- super.emitRecord(record, partition, offset, consumerRecord.timestamp());
+ // we attach the Kafka 0.10 timestamp here
+ emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp());
}
/**
- * Emit record Kafka-timestamp aware.
+ * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
+ * changing the List in the signature to a Collection.
*/
@Override
- protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long timestamp) throws Exception {
- if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
- // fast path logic, in case there are no watermarks
-
- // emit the record, using the checkpoint lock to guarantee
- // atomicity of record emission and offset state update
- synchronized (checkpointLock) {
- sourceContext.collectWithTimestamp(record, timestamp);
- partitionState.setOffset(offset);
- }
- }
- else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
- emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
- }
- else {
- emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
- }
+ protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) {
+ consumer.assign(topicPartitions);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 718db48..037d25b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -114,7 +114,6 @@ public class Kafka010FetcherTest {
SourceContext<String> sourceContext = mock(SourceContext.class);
List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
- StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
index 1302348..35e491a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -376,7 +376,7 @@ class SimpleConsumerThread<T> extends Thread {
continue partitionsLoop;
}
- owner.emitRecord(value, currentPartition, offset, Long.MIN_VALUE);
+ owner.emitRecord(value, currentPartition, offset);
}
else {
// no longer running
http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index a8c0397..acdcb61 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -201,7 +201,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
try {
assignPartitionsToConsumer(consumer, convertKafkaPartitions(subscribedPartitions()));
-
if (useMetrics) {
final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
addOffsetStateGauge(kafkaMetricGroup);
@@ -306,14 +305,22 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
}
}
- // Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method.
- protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
- emitRecord(record, partition, offset, Long.MIN_VALUE);
+ // ------------------------------------------------------------------------
+ // The below methods are overridden in the 0.10 fetcher, which otherwise
+ // reuses most of the 0.9 fetcher behavior
+ // ------------------------------------------------------------------------
+
+ protected void emitRecord(
+ T record,
+ KafkaTopicPartitionState<TopicPartition> partition,
+ long offset,
+ @SuppressWarnings("UnusedParameters") ConsumerRecord<?, ?> consumerRecord) throws Exception {
+
+ // the 0.9 Fetcher does not try to extract a timestamp
+ emitRecord(record, partition, offset);
}
- /**
- * Protected method to make the partition assignment pluggable, for different Kafka versions.
- */
- protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) {
+
+ protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) {
consumer.assign(topicPartitions);
}
@@ -322,7 +329,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
}
// ------------------------------------------------------------------------
- // Kafka 0.9 specific fetcher behavior
+ // Implement Methods of the AbstractFetcher
// ------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 3350b06..cf39606 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -205,32 +205,60 @@ public abstract class AbstractFetcher<T, KPH> {
}
}
}
-
+
// ------------------------------------------------------------------------
// emitting records
// ------------------------------------------------------------------------
/**
+ * Emits a record without attaching an existing timestamp to it.
+ *
* <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
* That makes the fast path efficient, the extended paths are called as separate methods.
+ *
* @param record The record to emit
* @param partitionState The state of the Kafka partition from which the record was fetched
* @param offset The offset of the record
- * @param timestamp The record's event-timestamp
*/
- protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
+ protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception {
if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
// fast path logic, in case there are no watermarks
// emit the record, using the checkpoint lock to guarantee
// atomicity of record emission and offset state update
synchronized (checkpointLock) {
- if(timestamp != Long.MIN_VALUE) {
- // this case is true for Kafka 0.10
- sourceContext.collectWithTimestamp(record, timestamp);
- } else {
- sourceContext.collect(record);
- }
+ sourceContext.collect(record);
+ partitionState.setOffset(offset);
+ }
+ }
+ else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+ emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
+ }
+ else {
+ emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
+ }
+ }
+
+ /**
+ * Emits a record attaching a timestamp to it.
+ *
+ * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
+ * That makes the fast path efficient, the extended paths are called as separate methods.
+ *
+ * @param record The record to emit
+ * @param partitionState The state of the Kafka partition from which the record was fetched
+ * @param offset The offset of the record
+ */
+ protected void emitRecordWithTimestamp(
+ T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
+
+ if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+ // fast path logic, in case there are no watermarks generated in the fetcher
+
+ // emit the record, using the checkpoint lock to guarantee
+ // atomicity of record emission and offset state update
+ synchronized (checkpointLock) {
+ sourceContext.collectWithTimestamp(record, timestamp);
partitionState.setOffset(offset);
}
}
@@ -285,14 +313,14 @@ public abstract class AbstractFetcher<T, KPH> {
// from the punctuated extractor
final long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
-
+
// emit the record with timestamp, using the usual checkpoint lock to guarantee
// atomicity of record emission and offset state update
synchronized (checkpointLock) {
sourceContext.collectWithTimestamp(record, timestamp);
partitionState.setOffset(offset);
}
-
+
// if we also have a new per-partition watermark, check if that is also a
// new cross-partition watermark
if (newWatermark != null) {
@@ -306,7 +334,7 @@ public abstract class AbstractFetcher<T, KPH> {
private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
long newMin = Long.MAX_VALUE;
-
+
for (KafkaTopicPartitionState<?> state : allPartitions) {
@SuppressWarnings("unchecked")
final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
@@ -314,7 +342,7 @@ public abstract class AbstractFetcher<T, KPH> {
newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
}
-
+
// double-check locking pattern
if (newMin > maxWatermarkSoFar) {
synchronized (checkpointLock) {
@@ -416,7 +444,7 @@ public abstract class AbstractFetcher<T, KPH> {
// add current offsets to gage
MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
- for(KafkaTopicPartitionState ktp: subscribedPartitions()){
+ for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
}
@@ -435,10 +463,10 @@ public abstract class AbstractFetcher<T, KPH> {
*/
private static class OffsetGauge implements Gauge<Long> {
- private final KafkaTopicPartitionState ktp;
+ private final KafkaTopicPartitionState<?> ktp;
private final OffsetGaugeType gaugeType;
- public OffsetGauge(KafkaTopicPartitionState ktp, OffsetGaugeType gaugeType) {
+ public OffsetGauge(KafkaTopicPartitionState<?> ktp, OffsetGaugeType gaugeType) {
this.ktp = ktp;
this.gaugeType = gaugeType;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 5801c24..0b3507a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
@@ -67,22 +66,22 @@ public class AbstractFetcherTimestampsTest {
// elements generate a watermark if the timestamp is a multiple of three
// elements for partition 1
- fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
- fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
- fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
+ fetcher.emitRecord(1L, part1, 1L);
+ fetcher.emitRecord(2L, part1, 2L);
+ fetcher.emitRecord(3L, part1, 3L);
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
assertFalse(sourceContext.hasWatermark());
// elements for partition 2
- fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
+ fetcher.emitRecord(12L, part2, 1L);
assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
assertFalse(sourceContext.hasWatermark());
// elements for partition 3
- fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
- fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
+ fetcher.emitRecord(101L, part3, 1L);
+ fetcher.emitRecord(102L, part3, 2L);
assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
@@ -91,25 +90,25 @@ public class AbstractFetcherTimestampsTest {
assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
// advance partition 3
- fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
- fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
- fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
+ fetcher.emitRecord(1003L, part3, 3L);
+ fetcher.emitRecord(1004L, part3, 4L);
+ fetcher.emitRecord(1005L, part3, 5L);
assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
// advance partition 1 beyond partition 2 - this bumps the watermark
- fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
+ fetcher.emitRecord(30L, part1, 4L);
assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
assertTrue(sourceContext.hasWatermark());
assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
// advance partition 2 again - this bumps the watermark
- fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
+ fetcher.emitRecord(13L, part2, 2L);
assertFalse(sourceContext.hasWatermark());
- fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
+ fetcher.emitRecord(14L, part2, 3L);
assertFalse(sourceContext.hasWatermark());
- fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
+ fetcher.emitRecord(15L, part2, 3L);
assertTrue(sourceContext.hasWatermark());
assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
}
@@ -141,20 +140,20 @@ public class AbstractFetcherTimestampsTest {
// elements generate a watermark if the timestamp is a multiple of three
// elements for partition 1
- fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
- fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
- fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
+ fetcher.emitRecord(1L, part1, 1L);
+ fetcher.emitRecord(2L, part1, 2L);
+ fetcher.emitRecord(3L, part1, 3L);
assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
// elements for partition 2
- fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
+ fetcher.emitRecord(12L, part2, 1L);
assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
// elements for partition 3
- fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
- fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
+ fetcher.emitRecord(101L, part3, 1L);
+ fetcher.emitRecord(102L, part3, 2L);
assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
@@ -164,14 +163,14 @@ public class AbstractFetcherTimestampsTest {
assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
// advance partition 3
- fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
- fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
- fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
+ fetcher.emitRecord(1003L, part3, 3L);
+ fetcher.emitRecord(1004L, part3, 4L);
+ fetcher.emitRecord(1005L, part3, 5L);
assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
// advance partition 1 beyond partition 2 - this bumps the watermark
- fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
+ fetcher.emitRecord(30L, part1, 4L);
assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
@@ -181,9 +180,9 @@ public class AbstractFetcherTimestampsTest {
assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
// advance partition 2 again - this bumps the watermark
- fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
- fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
- fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
+ fetcher.emitRecord(13L, part2, 2L);
+ fetcher.emitRecord(14L, part2, 3L);
+ fetcher.emitRecord(15L, part2, 3L);
processingTimeService.setCurrentTime(30);
// this blocks until the periodic thread emitted the watermark