You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/09 06:06:49 UTC
[2/2] flink git commit: [FLINK-3679] [kafka] Improve null record
handling for FlinkKafkaConsumer
[FLINK-3679] [kafka] Improve null record handling for FlinkKafkaConsumer
This commit generally improves null record handling by:
- also update the offset in state holders if record is null
- move null record related tests to AbstractFetcherTest, so that
behaviour is tested for all fetcher implementations
- let the docs be more informative of the behaviour of the consumer
when corrupted messages are encountered.
This closes #3314.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c39ad31f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c39ad31f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c39ad31f
Branch: refs/heads/master
Commit: c39ad31f3c321a1803abc97b2ba3074561d9af6e
Parents: afb4c5e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Mar 9 01:28:08 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Mar 9 14:05:38 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/kafka.md | 14 +-
.../connectors/kafka/Kafka09FetcherTest.java | 84 ----
.../kafka/internals/AbstractFetcher.java | 57 ++-
.../kafka/internals/AbstractFetcherTest.java | 461 +++++++++++++++++++
.../AbstractFetcherTimestampsTest.java | 335 --------------
5 files changed, 503 insertions(+), 448 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c39ad31f/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 331c9c7..06e40b2 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -146,10 +146,6 @@ The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into
`DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)`
method gets called for each Kafka message, passing the value from Kafka.
-There are two possible design choices when the `DeserializationSchema` encounters a corrupted message. It can
-either throw an `IOException` which causes the pipeline to be restarted, or it can return `null` where the Flink
-Kafka consumer will silently skip the corrupted message.
-
It is usually helpful to start from the `AbstractDeserializationSchema`, which takes care of describing the
produced Java/Scala type to Flink's type system. Users that implement a vanilla `DeserializationSchema` need
to implement the `getProducedType(...)` method themselves.
@@ -167,6 +163,16 @@ For convenience, Flink provides the following schemas:
into an ObjectNode object, from which fields can be accessed using objectNode.get("field").as(Int/String/...)().
The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
an optional "metadata" field that exposes the offset/partition/topic for this message.
+
+When encountering a corrupted message that cannot be deserialized for any reason, there
+are two options - either throwing an exception from the `deserialize(...)` method
+which will cause the job to fail and be restarted, or returning `null` to allow
+the Flink Kafka consumer to silently skip the corrupted message. Note that
+due to the consumer's fault tolerance (see below sections for more details),
+failing the job on the corrupted message will let the consumer attempt
+to deserialize the message again. Therefore, if deserialization still fails, the
+consumer will fall into a non-stop restart and fail loop on that corrupted
+message.
### Kafka Consumers Start Position Configuration
http://git-wip-us.apache.org/repos/asf/flink/blob/c39ad31f/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 61a8855..49144e6 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -31,7 +29,6 @@ import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.util.CollectingSourceContext;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -52,8 +49,6 @@ import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -424,85 +419,6 @@ public class Kafka09FetcherTest {
assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
}
- @Test
- public void testSkipCorruptedMessage() throws Exception {
-
- // ----- some test data -----
-
- final String topic = "test-topic";
- final int partition = 3;
- final byte[] payload = new byte[] {1, 2, 3, 4};
-
- final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
- new ConsumerRecord<>(topic, partition, 15, payload, payload),
- new ConsumerRecord<>(topic, partition, 16, payload, payload),
- new ConsumerRecord<>(topic, partition, 17, payload, "end".getBytes()));
-
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
- data.put(new TopicPartition(topic, partition), records);
-
- final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
-
- // ----- the test consumer -----
-
- final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
- return consumerRecords;
- }
- });
-
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- build a fetcher -----
-
- ArrayList<String> results = new ArrayList<>();
- SourceContext<String> sourceContext = new CollectingSourceContext<>(results, results);
- Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
- Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchema<String>() {
-
- @Override
- public String deserialize(byte[] messageKey, byte[] message,
- String topic, int partition, long offset) throws IOException {
- return offset == 15 ? null : new String(message);
- }
-
- @Override
- public boolean isEndOfStream(String nextElement) {
- return "end".equals(nextElement);
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
- };
-
- final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
- sourceContext,
- partitionsWithInitialOffsets,
- null, /* periodic watermark extractor */
- null, /* punctuated watermark extractor */
- new TestProcessingTimeService(),
- 10, /* watermark interval */
- this.getClass().getClassLoader(),
- true, /* checkpointing */
- "task_name",
- new UnregisteredMetricsGroup(),
- schema,
- new Properties(),
- 0L,
- false);
-
-
- // ----- run the fetcher -----
-
- fetcher.runFetchLoop();
- assertEquals(1, results.size());
- }
-
// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c39ad31f/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 76ce1a0..b8ac980 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -213,26 +213,28 @@ public abstract class AbstractFetcher<T, KPH> {
* @param offset The offset of the record
*/
protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception {
- if (record == null) {
- return;
- }
- if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
- // fast path logic, in case there are no watermarks
+ if (record != null) {
+ if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+ // fast path logic, in case there are no watermarks
- // emit the record, using the checkpoint lock to guarantee
- // atomicity of record emission and offset state update
+ // emit the record, using the checkpoint lock to guarantee
+ // atomicity of record emission and offset state update
+ synchronized (checkpointLock) {
+ sourceContext.collect(record);
+ partitionState.setOffset(offset);
+ }
+ } else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+ emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
+ } else {
+ emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
+ }
+ } else {
+ // if the record is null, simply just update the offset state for partition
synchronized (checkpointLock) {
- sourceContext.collect(record);
partitionState.setOffset(offset);
}
}
- else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
- emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
- }
- else {
- emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
- }
}
/**
@@ -248,22 +250,27 @@ public abstract class AbstractFetcher<T, KPH> {
protected void emitRecordWithTimestamp(
T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
- if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
- // fast path logic, in case there are no watermarks generated in the fetcher
+ if (record != null) {
+ if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+ // fast path logic, in case there are no watermarks generated in the fetcher
- // emit the record, using the checkpoint lock to guarantee
- // atomicity of record emission and offset state update
+ // emit the record, using the checkpoint lock to guarantee
+ // atomicity of record emission and offset state update
+ synchronized (checkpointLock) {
+ sourceContext.collectWithTimestamp(record, timestamp);
+ partitionState.setOffset(offset);
+ }
+ } else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+ emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
+ } else {
+ emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
+ }
+ } else {
+ // if the record is null, simply just update the offset state for partition
synchronized (checkpointLock) {
- sourceContext.collectWithTimestamp(record, timestamp);
partitionState.setOffset(offset);
}
}
- else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
- emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
- }
- else {
- emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/c39ad31f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
new file mode 100644
index 0000000..c1a64c4
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+@SuppressWarnings("serial")
+public class AbstractFetcherTest {
+
+ // ------------------------------------------------------------------------
+ // Record emitting tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testSkipCorruptedRecord() throws Exception {
+ final String testTopic = "test topic name";
+ Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+ TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+ TestFetcher<Long> fetcher = new TestFetcher<>(
+ sourceContext,
+ originalPartitions,
+ null, /* periodic watermark assigner */
+ null, /* punctuated watermark assigner */
+ mock(TestProcessingTimeService.class),
+ 0);
+
+ final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates()[0];
+
+ fetcher.emitRecord(1L, partitionStateHolder, 1L);
+ fetcher.emitRecord(2L, partitionStateHolder, 2L);
+ assertEquals(2L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(2L, partitionStateHolder.getOffset());
+
+ // emit null record
+ fetcher.emitRecord(null, partitionStateHolder, 3L);
+ assertEquals(2L, sourceContext.getLatestElement().getValue().longValue()); // the null record should be skipped
+ assertEquals(3L, partitionStateHolder.getOffset()); // the offset in state still should have advanced
+ }
+
+ @Test
+ public void testSkipCorruptedRecordWithPunctuatedWatermarks() throws Exception {
+ final String testTopic = "test topic name";
+ Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+ TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+ TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
+
+ TestFetcher<Long> fetcher = new TestFetcher<>(
+ sourceContext,
+ originalPartitions,
+ null, /* periodic watermark assigner */
+ new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()), /* punctuated watermark assigner */
+ processingTimeProvider,
+ 0);
+
+ final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates()[0];
+
+ // elements generate a watermark if the timestamp is a multiple of three
+ fetcher.emitRecord(1L, partitionStateHolder, 1L);
+ fetcher.emitRecord(2L, partitionStateHolder, 2L);
+ fetcher.emitRecord(3L, partitionStateHolder, 3L);
+ assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+ assertTrue(sourceContext.hasWatermark());
+ assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+ assertEquals(3L, partitionStateHolder.getOffset());
+
+ // emit null record
+ fetcher.emitRecord(null, partitionStateHolder, 4L);
+
+ // no elements or watermarks should have been collected
+ assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+ assertFalse(sourceContext.hasWatermark());
+ // the offset in state still should have advanced
+ assertEquals(4L, partitionStateHolder.getOffset());
+ }
+
+ @Test
+ public void testSkipCorruptedRecordWithPeriodicWatermarks() throws Exception {
+ final String testTopic = "test topic name";
+ Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+ TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+ TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
+
+ TestFetcher<Long> fetcher = new TestFetcher<>(
+ sourceContext,
+ originalPartitions,
+ new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), /* periodic watermark assigner */
+ null, /* punctuated watermark assigner */
+ processingTimeProvider,
+ 10);
+
+ final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates()[0];
+
+ // elements generate a watermark if the timestamp is a multiple of three
+ fetcher.emitRecord(1L, partitionStateHolder, 1L);
+ fetcher.emitRecord(2L, partitionStateHolder, 2L);
+ fetcher.emitRecord(3L, partitionStateHolder, 3L);
+ assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+ assertEquals(3L, partitionStateHolder.getOffset());
+
+ // advance timer for watermark emitting
+ processingTimeProvider.setCurrentTime(10L);
+ assertTrue(sourceContext.hasWatermark());
+ assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // emit null record
+ fetcher.emitRecord(null, partitionStateHolder, 4L);
+
+ // no elements should have been collected
+ assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+ // the offset in state still should have advanced
+ assertEquals(4L, partitionStateHolder.getOffset());
+
+ // no watermarks should be collected
+ processingTimeProvider.setCurrentTime(20L);
+ assertFalse(sourceContext.hasWatermark());
+ }
+
+ // ------------------------------------------------------------------------
+ // Timestamps & watermarks tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testPunctuatedWatermarks() throws Exception {
+ final String testTopic = "test topic name";
+ Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+ TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+ TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
+
+ TestFetcher<Long> fetcher = new TestFetcher<>(
+ sourceContext,
+ originalPartitions,
+ null, /* periodic watermark assigner */
+ new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()),
+ processingTimeProvider,
+ 0);
+
+ final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates()[0];
+ final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates()[1];
+ final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2];
+
+ // elements generate a watermark if the timestamp is a multiple of three
+
+ // elements for partition 1
+ fetcher.emitRecord(1L, part1, 1L);
+ fetcher.emitRecord(2L, part1, 2L);
+ fetcher.emitRecord(3L, part1, 3L);
+ assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+ assertFalse(sourceContext.hasWatermark());
+
+ // elements for partition 2
+ fetcher.emitRecord(12L, part2, 1L);
+ assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+ assertFalse(sourceContext.hasWatermark());
+
+ // elements for partition 3
+ fetcher.emitRecord(101L, part3, 1L);
+ fetcher.emitRecord(102L, part3, 2L);
+ assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+
+ // now, we should have a watermark
+ assertTrue(sourceContext.hasWatermark());
+ assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 3
+ fetcher.emitRecord(1003L, part3, 3L);
+ fetcher.emitRecord(1004L, part3, 4L);
+ fetcher.emitRecord(1005L, part3, 5L);
+ assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+ // advance partition 1 beyond partition 2 - this bumps the watermark
+ fetcher.emitRecord(30L, part1, 4L);
+ assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+ assertTrue(sourceContext.hasWatermark());
+ assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 2 again - this bumps the watermark
+ fetcher.emitRecord(13L, part2, 2L);
+ assertFalse(sourceContext.hasWatermark());
+ fetcher.emitRecord(14L, part2, 3L);
+ assertFalse(sourceContext.hasWatermark());
+ fetcher.emitRecord(15L, part2, 3L);
+ assertTrue(sourceContext.hasWatermark());
+ assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
+ }
+
+ @Test
+ public void testPeriodicWatermarks() throws Exception {
+ final String testTopic = "test topic name";
+ Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+ TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+
+ TestFetcher<Long> fetcher = new TestFetcher<>(
+ sourceContext,
+ originalPartitions,
+ new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
+ null, /* punctuated watermarks assigner*/
+ processingTimeService,
+ 10);
+
+ final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates()[0];
+ final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates()[1];
+ final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2];
+
+ // elements generate a watermark if the timestamp is a multiple of three
+
+ // elements for partition 1
+ fetcher.emitRecord(1L, part1, 1L);
+ fetcher.emitRecord(2L, part1, 2L);
+ fetcher.emitRecord(3L, part1, 3L);
+ assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+
+ // elements for partition 2
+ fetcher.emitRecord(12L, part2, 1L);
+ assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+
+ // elements for partition 3
+ fetcher.emitRecord(101L, part3, 1L);
+ fetcher.emitRecord(102L, part3, 2L);
+ assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+
+ processingTimeService.setCurrentTime(10);
+
+ // now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
+ assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 3
+ fetcher.emitRecord(1003L, part3, 3L);
+ fetcher.emitRecord(1004L, part3, 4L);
+ fetcher.emitRecord(1005L, part3, 5L);
+ assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+ // advance partition 1 beyond partition 2 - this bumps the watermark
+ fetcher.emitRecord(30L, part1, 4L);
+ assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+
+ processingTimeService.setCurrentTime(20);
+
+ // this blocks until the periodic thread emitted the watermark
+ assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 2 again - this bumps the watermark
+ fetcher.emitRecord(13L, part2, 2L);
+ fetcher.emitRecord(14L, part2, 3L);
+ fetcher.emitRecord(15L, part2, 3L);
+
+ processingTimeService.setCurrentTime(30);
+ // this blocks until the periodic thread emitted the watermark
+ long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
+ assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+ }
+
+ // ------------------------------------------------------------------------
+ // Test mocks
+ // ------------------------------------------------------------------------
+
+ private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
+
+ protected TestFetcher(
+ SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ ProcessingTimeService processingTimeProvider,
+ long autoWatermarkInterval) throws Exception
+ {
+ super(
+ sourceContext,
+ assignedPartitionsWithStartOffsets,
+ watermarksPeriodic,
+ watermarksPunctuated,
+ processingTimeProvider,
+ autoWatermarkInterval,
+ TestFetcher.class.getClassLoader(),
+ false);
+ }
+
+ @Override
+ public void runFetchLoop() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void cancel() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
+ return new Object();
+ }
+
+ @Override
+ public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class TestSourceContext<T> implements SourceContext<T> {
+
+ private final Object checkpointLock = new Object();
+ private final Object watermarkLock = new Object();
+
+ private volatile StreamRecord<T> latestElement;
+ private volatile Watermark currentWatermark;
+
+ @Override
+ public void collect(T element) {
+ this.latestElement = new StreamRecord<>(element);
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ this.latestElement = new StreamRecord<>(element, timestamp);
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ synchronized (watermarkLock) {
+ currentWatermark = mark;
+ watermarkLock.notifyAll();
+ }
+ }
+
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return checkpointLock;
+ }
+
+ @Override
+ public void close() {}
+
+ public StreamRecord<T> getLatestElement() {
+ return latestElement;
+ }
+
+ public boolean hasWatermark() {
+ return currentWatermark != null;
+ }
+
+ public Watermark getLatestWatermark() throws InterruptedException {
+ synchronized (watermarkLock) {
+ while (currentWatermark == null) {
+ watermarkLock.wait();
+ }
+ Watermark wm = currentWatermark;
+ currentWatermark = null;
+ return wm;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
+
+ private volatile long maxTimestamp = Long.MIN_VALUE;
+
+ @Override
+ public long extractTimestamp(Long element, long previousElementTimestamp) {
+ maxTimestamp = Math.max(maxTimestamp, element);
+ return element;
+ }
+
+ @Nullable
+ @Override
+ public Watermark getCurrentWatermark() {
+ return new Watermark(maxTimestamp);
+ }
+ }
+
+ private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
+
+ @Override
+ public long extractTimestamp(Long element, long previousElementTimestamp) {
+ return element;
+ }
+
+ @Nullable
+ @Override
+ public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
+ return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c39ad31f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
deleted file mode 100644
index 17a375d..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.util.SerializedValue;
-
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-@SuppressWarnings("serial")
-public class AbstractFetcherTimestampsTest {
-
- @Test
- public void testPunctuatedWatermarks() throws Exception {
- final String testTopic = "test topic name";
- Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
- originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
- originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
- originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
-
- TestSourceContext<Long> sourceContext = new TestSourceContext<>();
-
- TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
-
- TestFetcher<Long> fetcher = new TestFetcher<>(
- sourceContext,
- originalPartitions,
- null, /* periodic watermark assigner */
- new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()),
- processingTimeProvider,
- 0);
-
- final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates()[0];
- final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates()[1];
- final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2];
-
- // elements generate a watermark if the timestamp is a multiple of three
-
- // elements for partition 1
- fetcher.emitRecord(1L, part1, 1L);
- fetcher.emitRecord(2L, part1, 2L);
- fetcher.emitRecord(3L, part1, 3L);
- assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
- assertFalse(sourceContext.hasWatermark());
-
- // elements for partition 2
- fetcher.emitRecord(12L, part2, 1L);
- assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
- assertFalse(sourceContext.hasWatermark());
-
- // elements for partition 3
- fetcher.emitRecord(101L, part3, 1L);
- fetcher.emitRecord(102L, part3, 2L);
- assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
-
- // now, we should have a watermark
- assertTrue(sourceContext.hasWatermark());
- assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
-
- // advance partition 3
- fetcher.emitRecord(1003L, part3, 3L);
- fetcher.emitRecord(1004L, part3, 4L);
- fetcher.emitRecord(1005L, part3, 5L);
- assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
-
- // advance partition 1 beyond partition 2 - this bumps the watermark
- fetcher.emitRecord(30L, part1, 4L);
- assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
- assertTrue(sourceContext.hasWatermark());
- assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
-
- // advance partition 2 again - this bumps the watermark
- fetcher.emitRecord(13L, part2, 2L);
- assertFalse(sourceContext.hasWatermark());
- fetcher.emitRecord(14L, part2, 3L);
- assertFalse(sourceContext.hasWatermark());
- fetcher.emitRecord(15L, part2, 3L);
- assertTrue(sourceContext.hasWatermark());
- assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
- }
-
- @Test
- public void testPeriodicWatermarks() throws Exception {
- final String testTopic = "test topic name";
- Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
- originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
- originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
- originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
-
- TestSourceContext<Long> sourceContext = new TestSourceContext<>();
-
- TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
-
- TestFetcher<Long> fetcher = new TestFetcher<>(
- sourceContext,
- originalPartitions,
- new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
- null, /* punctuated watermarks assigner*/
- processingTimeService,
- 10);
-
- final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates()[0];
- final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates()[1];
- final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2];
-
- // elements generate a watermark if the timestamp is a multiple of three
-
- // elements for partition 1
- fetcher.emitRecord(1L, part1, 1L);
- fetcher.emitRecord(2L, part1, 2L);
- fetcher.emitRecord(3L, part1, 3L);
- assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
-
- // elements for partition 2
- fetcher.emitRecord(12L, part2, 1L);
- assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
-
- // elements for partition 3
- fetcher.emitRecord(101L, part3, 1L);
- fetcher.emitRecord(102L, part3, 2L);
- assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
-
- processingTimeService.setCurrentTime(10);
-
- // now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
- assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
-
- // advance partition 3
- fetcher.emitRecord(1003L, part3, 3L);
- fetcher.emitRecord(1004L, part3, 4L);
- fetcher.emitRecord(1005L, part3, 5L);
- assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
-
- // advance partition 1 beyond partition 2 - this bumps the watermark
- fetcher.emitRecord(30L, part1, 4L);
- assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
-
- processingTimeService.setCurrentTime(20);
-
- // this blocks until the periodic thread emitted the watermark
- assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
-
- // advance partition 2 again - this bumps the watermark
- fetcher.emitRecord(13L, part2, 2L);
- fetcher.emitRecord(14L, part2, 3L);
- fetcher.emitRecord(15L, part2, 3L);
-
- processingTimeService.setCurrentTime(30);
- // this blocks until the periodic thread emitted the watermark
- long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
- assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
- }
-
- // ------------------------------------------------------------------------
- // Test mocks
- // ------------------------------------------------------------------------
-
- private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
-
- protected TestFetcher(
- SourceContext<T> sourceContext,
- Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
- SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
- SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- ProcessingTimeService processingTimeProvider,
- long autoWatermarkInterval) throws Exception
- {
- super(
- sourceContext,
- assignedPartitionsWithStartOffsets,
- watermarksPeriodic,
- watermarksPunctuated,
- processingTimeProvider,
- autoWatermarkInterval,
- TestFetcher.class.getClassLoader(),
- false);
- }
-
- @Override
- public void runFetchLoop() throws Exception {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void cancel() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
- return new Object();
- }
-
- @Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
- throw new UnsupportedOperationException();
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static final class TestSourceContext<T> implements SourceContext<T> {
-
- private final Object checkpointLock = new Object();
- private final Object watermarkLock = new Object();
-
- private volatile StreamRecord<T> latestElement;
- private volatile Watermark currentWatermark;
-
- @Override
- public void collect(T element) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- this.latestElement = new StreamRecord<>(element, timestamp);
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- synchronized (watermarkLock) {
- currentWatermark = mark;
- watermarkLock.notifyAll();
- }
- }
-
-
- @Override
- public void markAsTemporarilyIdle() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object getCheckpointLock() {
- return checkpointLock;
- }
-
- @Override
- public void close() {}
-
- public StreamRecord<T> getLatestElement() {
- return latestElement;
- }
-
- public boolean hasWatermark() {
- return currentWatermark != null;
- }
-
- public Watermark getLatestWatermark() throws InterruptedException {
- synchronized (watermarkLock) {
- while (currentWatermark == null) {
- watermarkLock.wait();
- }
- Watermark wm = currentWatermark;
- currentWatermark = null;
- return wm;
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
-
- private volatile long maxTimestamp = Long.MIN_VALUE;
-
- @Override
- public long extractTimestamp(Long element, long previousElementTimestamp) {
- maxTimestamp = Math.max(maxTimestamp, element);
- return element;
- }
-
- @Nullable
- @Override
- public Watermark getCurrentWatermark() {
- return new Watermark(maxTimestamp);
- }
- }
-
- private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
-
- @Override
- public long extractTimestamp(Long element, long previousElementTimestamp) {
- return element;
- }
-
- @Nullable
- @Override
- public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
- return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
- }
-
- }
-}