You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/27 17:49:24 UTC
[1/3] flink git commit: [FLINK-5849] [kafka] Move FlinkKafkaConsumer
start offset determination to open()
Repository: flink
Updated Branches:
refs/heads/master 72f56d1fb -> 8bcb2ae3c
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index 9beed22..3bdfbed 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.junit.Test;
@@ -25,6 +26,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
import java.util.List;
import java.util.Set;
@@ -45,12 +48,18 @@ public class KafkaConsumerPartitionAssignmentTest {
new KafkaTopicPartition("test-topic", 1));
for (int i = 0; i < inPartitions.size(); i++) {
- List<KafkaTopicPartition> parts =
- FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i);
-
- assertNotNull(parts);
- assertEquals(1, parts.size());
- assertTrue(contains(inPartitions, parts.get(0).getPartition()));
+ Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>();
+ FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+ subscribedPartitionsToStartOffsets,
+ inPartitions,
+ i,
+ inPartitions.size(),
+ StartupMode.GROUP_OFFSETS);
+
+ List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
+
+ assertEquals(1, subscribedPartitions.size());
+ assertTrue(contains(inPartitions, subscribedPartitions.get(0).getPartition()));
}
}
catch (Exception e) {
@@ -59,15 +68,6 @@ public class KafkaConsumerPartitionAssignmentTest {
}
}
- private boolean contains(List<KafkaTopicPartition> inPartitions, int partition) {
- for (KafkaTopicPartition ktp : inPartitions) {
- if (ktp.getPartition() == partition) {
- return true;
- }
- }
- return false;
- }
-
@Test
public void testMultiplePartitionsPerConsumers() {
try {
@@ -87,14 +87,20 @@ public class KafkaConsumerPartitionAssignmentTest {
final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1;
for (int i = 0; i < numConsumers; i++) {
- List<KafkaTopicPartition> parts =
- FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
+ Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>();
+ FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+ subscribedPartitionsToStartOffsets,
+ partitions,
+ i,
+ numConsumers,
+ StartupMode.GROUP_OFFSETS);
+
+ List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
- assertNotNull(parts);
- assertTrue(parts.size() >= minPartitionsPerConsumer);
- assertTrue(parts.size() <= maxPartitionsPerConsumer);
+ assertTrue(subscribedPartitions.size() >= minPartitionsPerConsumer);
+ assertTrue(subscribedPartitions.size() <= maxPartitionsPerConsumer);
- for (KafkaTopicPartition p : parts) {
+ for (KafkaTopicPartition p : subscribedPartitions) {
// check that the element was actually contained
assertTrue(allPartitions.remove(p));
}
@@ -124,12 +130,19 @@ public class KafkaConsumerPartitionAssignmentTest {
final int numConsumers = 2 * inPartitions.size() + 3;
for (int i = 0; i < numConsumers; i++) {
- List<KafkaTopicPartition> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
+ Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>();
+ FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+ subscribedPartitionsToStartOffsets,
+ inPartitions,
+ i,
+ numConsumers,
+ StartupMode.GROUP_OFFSETS);
+
+ List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
- assertNotNull(parts);
- assertTrue(parts.size() <= 1);
+ assertTrue(subscribedPartitions.size() <= 1);
- for (KafkaTopicPartition p : parts) {
+ for (KafkaTopicPartition p : subscribedPartitions) {
// check that the element was actually contained
assertTrue(allPartitions.remove(p));
}
@@ -148,13 +161,23 @@ public class KafkaConsumerPartitionAssignmentTest {
public void testAssignEmptyPartitions() {
try {
List<KafkaTopicPartition> ep = new ArrayList<>();
- List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
- assertNotNull(parts1);
- assertTrue(parts1.isEmpty());
-
- List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
- assertNotNull(parts2);
- assertTrue(parts2.isEmpty());
+ Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>();
+ FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+ subscribedPartitionsToStartOffsets,
+ ep,
+ 2,
+ 4,
+ StartupMode.GROUP_OFFSETS);
+ assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
+
+ subscribedPartitionsToStartOffsets = new HashMap<>();
+ FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+ subscribedPartitionsToStartOffsets,
+ ep,
+ 0,
+ 1,
+ StartupMode.GROUP_OFFSETS);
+ assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
}
catch (Exception e) {
e.printStackTrace();
@@ -184,33 +207,53 @@ public class KafkaConsumerPartitionAssignmentTest {
final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers;
final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1;
- List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(
- initialPartitions, numConsumers, 0);
- List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(
- initialPartitions, numConsumers, 1);
- List<KafkaTopicPartition> parts3 = FlinkKafkaConsumerBase.assignPartitions(
- initialPartitions, numConsumers, 2);
-
- assertNotNull(parts1);
- assertNotNull(parts2);
- assertNotNull(parts3);
-
- assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
- assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
- assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
- assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
- assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
- assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
-
- for (KafkaTopicPartition p : parts1) {
+ Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets1 = new HashMap<>();
+ Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets2 = new HashMap<>();
+ Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets3 = new HashMap<>();
+
+ FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+ subscribedPartitionsToStartOffsets1,
+ initialPartitions,
+ 0,
+ numConsumers,
+ StartupMode.GROUP_OFFSETS);
+
+ FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+ subscribedPartitionsToStartOffsets2,
+ initialPartitions,
+ 1,
+ numConsumers,
+ StartupMode.GROUP_OFFSETS);
+
+ FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+ subscribedPartitionsToStartOffsets3,
+ initialPartitions,
+ 2,
+ numConsumers,
+ StartupMode.GROUP_OFFSETS);
+
+ List<KafkaTopicPartition> subscribedPartitions1 = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
+ List<KafkaTopicPartition> subscribedPartitions2 = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
+ List<KafkaTopicPartition> subscribedPartitions3 = new ArrayList<>(subscribedPartitionsToStartOffsets3.keySet());
+
+ assertTrue(subscribedPartitions1.size() >= minInitialPartitionsPerConsumer);
+ assertTrue(subscribedPartitions1.size() <= maxInitialPartitionsPerConsumer);
+ assertTrue(subscribedPartitions2.size() >= minInitialPartitionsPerConsumer);
+ assertTrue(subscribedPartitions2.size() <= maxInitialPartitionsPerConsumer);
+ assertTrue(subscribedPartitions3.size() >= minInitialPartitionsPerConsumer);
+ assertTrue(subscribedPartitions3.size() <= maxInitialPartitionsPerConsumer);
+
+ for (KafkaTopicPartition p : subscribedPartitions1) {
// check that the element was actually contained
assertTrue(allInitialPartitions.remove(p));
}
- for (KafkaTopicPartition p : parts2) {
+
+ for (KafkaTopicPartition p : subscribedPartitions2) {
// check that the element was actually contained
assertTrue(allInitialPartitions.remove(p));
}
- for (KafkaTopicPartition p : parts3) {
+
+ for (KafkaTopicPartition p : subscribedPartitions3) {
// check that the element was actually contained
assertTrue(allInitialPartitions.remove(p));
}
@@ -220,39 +263,61 @@ public class KafkaConsumerPartitionAssignmentTest {
// grow the set of partitions and distribute anew
- List<KafkaTopicPartition> parts1new = FlinkKafkaConsumerBase.assignPartitions(
- newPartitions, numConsumers, 0);
- List<KafkaTopicPartition> parts2new = FlinkKafkaConsumerBase.assignPartitions(
- newPartitions, numConsumers, 1);
- List<KafkaTopicPartition> parts3new = FlinkKafkaConsumerBase.assignPartitions(
- newPartitions, numConsumers, 2);
+ subscribedPartitionsToStartOffsets1 = new HashMap<>();
+ subscribedPartitionsToStartOffsets2 = new HashMap<>();
+ subscribedPartitionsToStartOffsets3 = new HashMap<>();
+
+ FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+ subscribedPartitionsToStartOffsets1,
+ newPartitions,
+ 0,
+ numConsumers,
+ StartupMode.GROUP_OFFSETS);
+
+ FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+ subscribedPartitionsToStartOffsets2,
+ newPartitions,
+ 1,
+ numConsumers,
+ StartupMode.GROUP_OFFSETS);
+
+ FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
+ subscribedPartitionsToStartOffsets3,
+ newPartitions,
+ 2,
+ numConsumers,
+ StartupMode.GROUP_OFFSETS);
+
+ List<KafkaTopicPartition> subscribedPartitions1New = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
+ List<KafkaTopicPartition> subscribedPartitions2New = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
+ List<KafkaTopicPartition> subscribedPartitions3New = new ArrayList<>(subscribedPartitionsToStartOffsets3.keySet());
// new partitions must include all old partitions
- assertTrue(parts1new.size() > parts1.size());
- assertTrue(parts2new.size() > parts2.size());
- assertTrue(parts3new.size() > parts3.size());
+ assertTrue(subscribedPartitions1New.size() > subscribedPartitions1.size());
+ assertTrue(subscribedPartitions2New.size() > subscribedPartitions2.size());
+ assertTrue(subscribedPartitions3New.size() > subscribedPartitions3.size());
- assertTrue(parts1new.containsAll(parts1));
- assertTrue(parts2new.containsAll(parts2));
- assertTrue(parts3new.containsAll(parts3));
+ assertTrue(subscribedPartitions1New.containsAll(subscribedPartitions1));
+ assertTrue(subscribedPartitions2New.containsAll(subscribedPartitions2));
+ assertTrue(subscribedPartitions3New.containsAll(subscribedPartitions3));
- assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
- assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
- assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
- assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
- assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
- assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
+ assertTrue(subscribedPartitions1New.size() >= minNewPartitionsPerConsumer);
+ assertTrue(subscribedPartitions1New.size() <= maxNewPartitionsPerConsumer);
+ assertTrue(subscribedPartitions2New.size() >= minNewPartitionsPerConsumer);
+ assertTrue(subscribedPartitions2New.size() <= maxNewPartitionsPerConsumer);
+ assertTrue(subscribedPartitions3New.size() >= minNewPartitionsPerConsumer);
+ assertTrue(subscribedPartitions3New.size() <= maxNewPartitionsPerConsumer);
- for (KafkaTopicPartition p : parts1new) {
+ for (KafkaTopicPartition p : subscribedPartitions1New) {
// check that the element was actually contained
assertTrue(allNewPartitions.remove(p));
}
- for (KafkaTopicPartition p : parts2new) {
+ for (KafkaTopicPartition p : subscribedPartitions2New) {
// check that the element was actually contained
assertTrue(allNewPartitions.remove(p));
}
- for (KafkaTopicPartition p : parts3new) {
+ for (KafkaTopicPartition p : subscribedPartitions3New) {
// check that the element was actually contained
assertTrue(allNewPartitions.remove(p));
}
@@ -266,4 +331,13 @@ public class KafkaConsumerPartitionAssignmentTest {
}
}
+ private boolean contains(List<KafkaTopicPartition> inPartitions, int partition) {
+ for (KafkaTopicPartition ktp : inPartitions) {
+ if (ktp.getPartition() == partition) {
+ return true;
+ }
+ }
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 6887518..9e9923d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -44,10 +44,10 @@ public class AbstractFetcherTimestampsTest {
@Test
public void testPunctuatedWatermarks() throws Exception {
final String testTopic = "test topic name";
- List<KafkaTopicPartition> originalPartitions = Arrays.asList(
- new KafkaTopicPartition(testTopic, 7),
- new KafkaTopicPartition(testTopic, 13),
- new KafkaTopicPartition(testTopic, 21));
+ Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
@@ -56,15 +56,14 @@ public class AbstractFetcherTimestampsTest {
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
- null,
null, /* periodic watermark assigner */
new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()),
processingTimeProvider,
0);
- final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
- final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
- final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+ final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates()[0];
+ final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates()[1];
+ final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2];
// elements generate a watermark if the timestamp is a multiple of three
@@ -119,10 +118,10 @@ public class AbstractFetcherTimestampsTest {
@Test
public void testPeriodicWatermarks() throws Exception {
final String testTopic = "test topic name";
- List<KafkaTopicPartition> originalPartitions = Arrays.asList(
- new KafkaTopicPartition(testTopic, 7),
- new KafkaTopicPartition(testTopic, 13),
- new KafkaTopicPartition(testTopic, 21));
+ Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
@@ -131,15 +130,14 @@ public class AbstractFetcherTimestampsTest {
TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
- null,
new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
null, /* punctuated watermarks assigner*/
processingTimeService,
10);
- final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
- final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
- final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+ final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates()[0];
+ final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates()[1];
+ final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2];
// elements generate a watermark if the timestamp is a multiple of three
@@ -202,8 +200,7 @@ public class AbstractFetcherTimestampsTest {
protected TestFetcher(
SourceContext<T> sourceContext,
- List<KafkaTopicPartition> assignedPartitions,
- HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
@@ -211,14 +208,12 @@ public class AbstractFetcherTimestampsTest {
{
super(
sourceContext,
- assignedPartitions,
- restoredSnapshotState,
+ assignedPartitionsWithStartOffsets,
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
autoWatermarkInterval,
TestFetcher.class.getClassLoader(),
- StartupMode.LATEST,
false);
}
[2/3] flink git commit: [FLINK-5849] [kafka] Move FlinkKafkaConsumer
start offset determination to open()
Posted by tz...@apache.org.
[FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset determination to open()
This closes #3378.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed68fedb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed68fedb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed68fedb
Branch: refs/heads/master
Commit: ed68fedbe90db03823d75a020510ad3c344fa73e
Parents: 72f56d1
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Feb 21 23:05:32 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Feb 28 00:54:48 2017 +0800
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaConsumer010.java | 9 +-
.../kafka/internal/Kafka010Fetcher.java | 12 +-
.../internal/KafkaConsumerCallBridge010.java | 9 +-
.../connectors/kafka/Kafka010FetcherTest.java | 23 +-
.../connectors/kafka/FlinkKafkaConsumer08.java | 22 +-
.../kafka/internals/Kafka08Fetcher.java | 77 +++----
.../kafka/internals/ZookeeperOffsetHandler.java | 18 +-
.../connectors/kafka/FlinkKafkaConsumer09.java | 9 +-
.../kafka/internal/Kafka09Fetcher.java | 17 +-
.../kafka/internal/KafkaConsumerCallBridge.java | 12 +-
.../kafka/internal/KafkaConsumerThread.java | 79 ++-----
.../connectors/kafka/Kafka09FetcherTest.java | 23 +-
.../kafka/FlinkKafkaConsumerBase.java | 187 ++++++++--------
.../connectors/kafka/config/StartupMode.java | 20 +-
.../kafka/internals/AbstractFetcher.java | 89 ++++----
.../internals/KafkaTopicPartitionState.java | 10 +-
.../KafkaTopicPartitionStateSentinel.java | 55 +++++
.../FlinkKafkaConsumerBaseMigrationTest.java | 33 ++-
.../kafka/FlinkKafkaConsumerBaseTest.java | 20 +-
.../KafkaConsumerPartitionAssignmentTest.java | 222 ++++++++++++-------
.../AbstractFetcherTimestampsTest.java | 37 ++--
21 files changed, 510 insertions(+), 473 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 3a58216..716fa19 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaW
import org.apache.flink.util.SerializedValue;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.Map;
import java.util.List;
import java.util.Properties;
@@ -128,8 +128,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
- List<KafkaTopicPartition> thisSubtaskPartitions,
- HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext) throws Exception {
@@ -138,8 +137,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
return new Kafka010Fetcher<>(
sourceContext,
- thisSubtaskPartitions,
- restoredSnapshotState,
+ assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
@@ -151,7 +149,6 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
deserializer,
properties,
pollTimeout,
- startupMode,
useMetrics);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index efb6f88..da6ecd0 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -22,7 +22,6 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -32,8 +31,7 @@ import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
import java.util.Properties;
/**
@@ -48,8 +46,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
public Kafka010Fetcher(
SourceContext<T> sourceContext,
- List<KafkaTopicPartition> assignedPartitions,
- HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
@@ -61,13 +58,11 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
- StartupMode startupMode,
boolean useMetrics) throws Exception
{
super(
sourceContext,
- assignedPartitions,
- restoredSnapshotState,
+ assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
@@ -79,7 +74,6 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
deserializer,
kafkaProperties,
pollTimeout,
- startupMode,
useMetrics);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
index 1e0bc5b..0fda9a6 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.internal;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
+import java.util.Collections;
import java.util.List;
/**
@@ -39,12 +40,12 @@ public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
}
@Override
- public void seekPartitionsToBeginning(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
- consumer.seekToBeginning(partitions);
+ public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
+ consumer.seekToBeginning(Collections.singletonList(partition));
}
@Override
- public void seekPartitionsToEnd(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
- consumer.seekToEnd(partitions);
+ public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
+ consumer.seekToEnd(Collections.singletonList(partition));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 98aa28a..17ba712 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -24,10 +24,10 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -118,13 +118,13 @@ public class Kafka010FetcherTest {
@SuppressWarnings("unchecked")
SourceContext<String> sourceContext = mock(SourceContext.class);
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
- topics,
- null, /* no restored state */
+ partitionsWithInitialOffsets,
null, /* periodic assigner */
null, /* punctuated assigner */
new TestProcessingTimeService(),
@@ -136,7 +136,6 @@ public class Kafka010FetcherTest {
schema,
new Properties(),
0L,
- StartupMode.GROUP_OFFSETS,
false);
// ----- run the fetcher -----
@@ -256,13 +255,13 @@ public class Kafka010FetcherTest {
@SuppressWarnings("unchecked")
SourceContext<String> sourceContext = mock(SourceContext.class);
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
- topics,
- null, /* no restored state */
+ partitionsWithInitialOffsets,
null, /* periodic assigner */
null, /* punctuated assigner */
new TestProcessingTimeService(),
@@ -274,7 +273,6 @@ public class Kafka010FetcherTest {
schema,
new Properties(),
0L,
- StartupMode.GROUP_OFFSETS,
false);
// ----- run the fetcher -----
@@ -372,13 +370,13 @@ public class Kafka010FetcherTest {
// ----- build a fetcher -----
BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
- topics,
- null, /* no restored state */
+ partitionsWithInitialOffsets,
null, /* periodic watermark extractor */
null, /* punctuated watermark extractor */
new TestProcessingTimeService(),
@@ -390,7 +388,6 @@ public class Kafka010FetcherTest {
schema,
new Properties(),
0L,
- StartupMode.GROUP_OFFSETS,
false);
// ----- run the fetcher -----
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index c0e4dd7..bf7ed02 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -45,10 +45,10 @@ import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
-import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.Random;
@@ -194,19 +194,23 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
- List<KafkaTopicPartition> thisSubtaskPartitions,
- HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext) throws Exception {
boolean useMetrics = !Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false"));
- return new Kafka08Fetcher<>(sourceContext,
- thisSubtaskPartitions, restoredSnapshotState,
- watermarksPeriodic, watermarksPunctuated,
- runtimeContext, deserializer, kafkaProperties,
- autoCommitInterval, startupMode, useMetrics);
+ return new Kafka08Fetcher<>(
+ sourceContext,
+ assignedPartitionsWithInitialOffsets,
+ watermarksPeriodic,
+ watermarksPunctuated,
+ runtimeContext,
+ deserializer,
+ kafkaProperties,
+ autoCommitInterval,
+ useMetrics);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index ad520d8..de201e5 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -91,27 +91,23 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
public Kafka08Fetcher(
SourceContext<T> sourceContext,
- List<KafkaTopicPartition> assignedPartitions,
- HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long autoCommitInterval,
- StartupMode startupMode,
boolean useMetrics) throws Exception
{
super(
sourceContext,
- assignedPartitions,
- restoredSnapshotState,
+ assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
- startupMode,
useMetrics);
this.deserializer = checkNotNull(deserializer);
@@ -122,7 +118,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
// initially, all these partitions are not assigned to a specific broker connection
- for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
+ for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
unassignedPartitionsQueue.add(partition);
}
}
@@ -146,43 +142,32 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
PeriodicOffsetCommitter periodicCommitter = null;
try {
- // if we're not restored from a checkpoint, all partitions will not have their offset set;
- // depending on the configured startup mode, accordingly set the starting offsets
- if (!isRestored) {
- switch (startupMode) {
- case EARLIEST:
- for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
- partition.setOffset(OffsetRequest.EarliestTime());
- }
- break;
- case LATEST:
- for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
- partition.setOffset(OffsetRequest.LatestTime());
- }
- break;
- default:
- case GROUP_OFFSETS:
- List<KafkaTopicPartition> partitions = new ArrayList<>(subscribedPartitions().length);
- for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
- partitions.add(partition.getKafkaTopicPartition());
- }
-
- Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitions);
- for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
- Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
- if (offset != null) {
- // the committed offset in ZK represents the next record to process,
- // so we subtract it by 1 to correctly represent internal state
- partition.setOffset(offset - 1);
- } else {
- // if we can't find an offset for a partition in ZK when using GROUP_OFFSETS,
- // we default to "auto.offset.reset" like the Kafka high-level consumer
- LOG.warn("No group offset can be found for partition {} in Zookeeper;" +
- " resetting starting offset to 'auto.offset.reset'", partition);
-
- partition.setOffset(invalidOffsetBehavior);
- }
- }
+ // offsets in the state may still be placeholder sentinel values if we are starting fresh, or the
+ // checkpoint / savepoint state we were restored with had not completely been replaced with actual offset
+ // values yet; replace those with actual offsets, according to what the sentinel value represent.
+ for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
+ if (partition.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
+ // this will be replaced by an actual offset in SimpleConsumerThread
+ partition.setOffset(OffsetRequest.EarliestTime());
+ } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
+ // this will be replaced by an actual offset in SimpleConsumerThread
+ partition.setOffset(OffsetRequest.LatestTime());
+ } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+ Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
+ if (committedOffset != null) {
+ // the committed offset in ZK represents the next record to process,
+ // so we subtract it by 1 to correctly represent internal state
+ partition.setOffset(committedOffset - 1);
+ } else {
+ // if we can't find an offset for a partition in ZK when using GROUP_OFFSETS,
+ // we default to "auto.offset.reset" like the Kafka high-level consumer
+ LOG.warn("No group offset can be found for partition {} in Zookeeper;" +
+ " resetting starting offset to 'auto.offset.reset'", partition);
+
+ partition.setOffset(invalidOffsetBehavior);
+ }
+ } else {
+ // the partition already has a specific start offset and is ready to be consumed
}
}
@@ -191,7 +176,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval);
periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler,
- subscribedPartitions(), errorHandler, autoCommitInterval);
+ subscribedPartitionStates(), errorHandler, autoCommitInterval);
periodicCommitter.setName("Periodic Kafka partition offset committer");
periodicCommitter.setDaemon(true);
periodicCommitter.start();
@@ -388,7 +373,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
}
// Set committed offsets in topic partition state
- KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitions();
+ KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitionStates();
for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
Long offset = offsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index 8f2ef09..cec980f 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -30,8 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -96,22 +94,12 @@ public class ZookeeperOffsetHandler {
}
/**
- * @param partitions The partitions to read offsets for.
+ * @param partition The partition to read offset for.
* @return The mapping from partition to offset.
* @throws Exception This method forwards exceptions.
*/
- public Map<KafkaTopicPartition, Long> getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception {
- Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size());
- for (KafkaTopicPartition tp : partitions) {
- Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition());
-
- if (offset != null) {
- LOG.info("Offset for TopicPartition {}:{} was set to {} in ZooKeeper. Seeking fetcher to that position.",
- tp.getTopic(), tp.getPartition(), offset);
- ret.put(tp, offset);
- }
- }
- return ret;
+ public Long getCommittedOffset(KafkaTopicPartition partition) throws Exception {
+ return getOffsetFromZooKeeper(curatorClient, groupId, partition.getTopic(), partition.getPartition());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 9a61b91..c7236a2 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -171,8 +171,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
- List<KafkaTopicPartition> thisSubtaskPartitions,
- HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext) throws Exception {
@@ -181,8 +180,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
return new Kafka09Fetcher<>(
sourceContext,
- thisSubtaskPartitions,
- restoredSnapshotState,
+ assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
@@ -194,7 +192,6 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
deserializer,
properties,
pollTimeout,
- startupMode,
useMetrics);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index b7c9bc2..c389486 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -22,7 +22,6 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
@@ -71,8 +70,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
public Kafka09Fetcher(
SourceContext<T> sourceContext,
- List<KafkaTopicPartition> assignedPartitions,
- HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
@@ -84,19 +82,16 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
- StartupMode startupMode,
boolean useMetrics) throws Exception
{
super(
sourceContext,
- assignedPartitions,
- restoredSnapshotState,
+ assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
- startupMode,
useMetrics);
this.deserializer = deserializer;
@@ -114,13 +109,11 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
LOG,
handover,
kafkaProperties,
- subscribedPartitions(),
+ subscribedPartitionStates(),
kafkaMetricGroup,
createCallBridge(),
getFetcherName() + " for " + taskNameWithSubtasks,
pollTimeout,
- startupMode,
- isRestored,
useMetrics);
}
@@ -142,7 +135,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
// get the records for each topic partition
- for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
+ for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
@@ -226,7 +219,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
@Override
public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
- KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions();
+ KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitionStates();
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
index a97b3cf..37ba34c 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
@@ -39,16 +39,12 @@ public class KafkaConsumerCallBridge {
consumer.assign(topicPartitions);
}
- public void seekPartitionsToBeginning(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
- for (TopicPartition partition : partitions) {
- consumer.seekToBeginning(partition);
- }
+ public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
+ consumer.seekToBeginning(partition);
}
- public void seekPartitionsToEnd(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
- for (TopicPartition partition : partitions) {
- consumer.seekToEnd(partition);
- }
+ public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
+ consumer.seekToEnd(partition);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index 03fe2c6..cbe1551 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -19,8 +19,8 @@
package org.apache.flink.streaming.connectors.kafka.internal;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -69,7 +69,7 @@ public class KafkaConsumerThread extends Thread {
private final Properties kafkaProperties;
/** The partitions that this consumer reads from */
- private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions;
+ private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitionStates;
/** We get this from the outside to publish metrics. **/
private final MetricGroup kafkaMetricGroup;
@@ -80,12 +80,6 @@ public class KafkaConsumerThread extends Thread {
/** The maximum number of milliseconds to wait for a fetch batch */
private final long pollTimeout;
- /** The configured startup mode (relevant only if we're restored from checkpoint / savepoint) */
- private final StartupMode startupMode;
-
- /** Flag whether or not we're restored from checkpoint / savepoint */
- private final boolean isRestored;
-
/** Flag whether to add Kafka's metrics to the Flink metrics */
private final boolean useMetrics;
@@ -103,13 +97,11 @@ public class KafkaConsumerThread extends Thread {
Logger log,
Handover handover,
Properties kafkaProperties,
- KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions,
+ KafkaTopicPartitionState<TopicPartition>[] subscribedPartitionStates,
MetricGroup kafkaMetricGroup,
KafkaConsumerCallBridge consumerCallBridge,
String threadName,
long pollTimeout,
- StartupMode startupMode,
- boolean isRestored,
boolean useMetrics) {
super(threadName);
@@ -120,21 +112,8 @@ public class KafkaConsumerThread extends Thread {
this.kafkaProperties = checkNotNull(kafkaProperties);
this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
this.consumerCallBridge = checkNotNull(consumerCallBridge);
- this.startupMode = checkNotNull(startupMode);
-
- this.subscribedPartitions = checkNotNull(subscribedPartitions);
- this.isRestored = isRestored;
-
- // if we are restoring from a checkpoint / savepoint, all
- // subscribed partitions' state should have defined offsets
- if (isRestored) {
- for (KafkaTopicPartitionState<TopicPartition> subscribedPartition : subscribedPartitions) {
- if (!subscribedPartition.isOffsetDefined()) {
- throw new IllegalArgumentException("Restoring from a checkpoint / savepoint, but found a " +
- "partition state " + subscribedPartition + " that does not have a defined offset.");
- }
- }
- }
+
+ this.subscribedPartitionStates = checkNotNull(subscribedPartitionStates);
this.pollTimeout = pollTimeout;
this.useMetrics = useMetrics;
@@ -173,7 +152,7 @@ public class KafkaConsumerThread extends Thread {
final OffsetCommitCallback offsetCommitCallback = new CommitCallback();
// tell the consumer which partitions to work with
- consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions));
+ consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitionStates));
// register Kafka's very own metrics in Flink's metric reporters
if (useMetrics) {
@@ -195,39 +174,23 @@ public class KafkaConsumerThread extends Thread {
return;
}
- if (isRestored) {
- for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
- log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
- "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
-
- consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
- }
- } else {
- List<TopicPartition> partitionList = convertKafkaPartitions(subscribedPartitions);
-
- // fetch offsets from Kafka, depending on the configured startup mode
- switch (startupMode) {
- case EARLIEST:
- log.info("Setting starting point as earliest offset for partitions {}", partitionList);
-
- consumerCallBridge.seekPartitionsToBeginning(consumer, partitionList);
- break;
- case LATEST:
- log.info("Setting starting point as latest offset for partitions {}", partitionList);
-
- consumerCallBridge.seekPartitionsToEnd(consumer, partitionList);
- break;
- default:
- case GROUP_OFFSETS:
- log.info("Using group offsets in Kafka of group.id {} as starting point for partitions {}",
- kafkaProperties.getProperty("group.id"), partitionList);
- }
+ // offsets in the state may still be placeholder sentinel values if we are starting fresh, or the
+ // checkpoint / savepoint state we were restored with had not completely been replaced with actual offset
+ // values yet; replace those with actual offsets, according to what the sentinel value represent.
+ for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates) {
+ if (partition.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
+ consumerCallBridge.seekPartitionToBeginning(consumer, partition.getKafkaPartitionHandle());
+ partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
+ } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
+ consumerCallBridge.seekPartitionToEnd(consumer, partition.getKafkaPartitionHandle());
+ partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
+ } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+ // the KafkaConsumer by default will automatically seek the consumer position
+ // to the committed group offset, so we do not need to do it.
- // on startup, all partition states will not have defined offsets;
- // set the initial states with the offsets fetched from Kafka
- for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
- // the fetched offset represents the next record to process, so we need to subtract it by 1
partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
+ } else {
+ consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index abd75cc..49144e6 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -24,10 +24,10 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -118,13 +118,13 @@ public class Kafka09FetcherTest {
@SuppressWarnings("unchecked")
SourceContext<String> sourceContext = mock(SourceContext.class);
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
sourceContext,
- topics,
- null, /* no restored state */
+ partitionsWithInitialOffsets,
null, /* periodic watermark extractor */
null, /* punctuated watermark extractor */
new TestProcessingTimeService(),
@@ -136,7 +136,6 @@ public class Kafka09FetcherTest {
schema,
new Properties(),
0L,
- StartupMode.GROUP_OFFSETS,
false);
// ----- run the fetcher -----
@@ -256,13 +255,13 @@ public class Kafka09FetcherTest {
@SuppressWarnings("unchecked")
SourceContext<String> sourceContext = mock(SourceContext.class);
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
sourceContext,
- topics,
- null, /* no restored state */
+ partitionsWithInitialOffsets,
null, /* periodic watermark extractor */
null, /* punctuated watermark extractor */
new TestProcessingTimeService(),
@@ -274,7 +273,6 @@ public class Kafka09FetcherTest {
schema,
new Properties(),
0L,
- StartupMode.GROUP_OFFSETS,
false);
// ----- run the fetcher -----
@@ -372,13 +370,13 @@ public class Kafka09FetcherTest {
// ----- build a fetcher -----
BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
sourceContext,
- topics,
- null, /* no restored state */
+ partitionsWithInitialOffsets,
null, /* periodic watermark extractor */
null, /* punctuated watermark extractor */
new TestProcessingTimeService(),
@@ -390,7 +388,6 @@ public class Kafka09FetcherTest {
schema,
new Properties(),
0L,
- StartupMode.GROUP_OFFSETS,
false);
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 1121d1b..144ede8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -40,15 +40,11 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -90,8 +86,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
/** The schema to convert between Kafka's byte messages, and Flink's objects */
protected final KeyedDeserializationSchema<T> deserializer;
- /** The set of topic partitions that the source will read */
- private List<KafkaTopicPartition> subscribedPartitions;
+ /** The set of topic partitions that the source will read, with their initial offsets to start reading from */
+ private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
* to exploit per-partition timestamp characteristics.
@@ -138,17 +134,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
this.deserializer = checkNotNull(deserializer, "valueDeserializer");
}
- /**
- * This method must be called from the subclasses, to set the list of all subscribed partitions
- * that this consumer will fetch from (across all subtasks).
- *
- * @param allSubscribedPartitions The list of all partitions that all subtasks together should fetch from.
- */
- protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) {
- checkNotNull(allSubscribedPartitions);
- this.subscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions);
- }
-
// ------------------------------------------------------------------------
// Configuration
// ------------------------------------------------------------------------
@@ -263,17 +248,67 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
// ------------------------------------------------------------------------
@Override
+ public void open(Configuration configuration) {
+ List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
+
+ subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size());
+
+ if (kafkaTopicPartitions != null) {
+ if (restoredState != null) {
+ for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
+ if (restoredState.containsKey(kafkaTopicPartition)) {
+ subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition));
+ }
+ }
+
+ LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
+ getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
+ } else {
+ initializeSubscribedPartitionsToStartOffsets(
+ subscribedPartitionsToStartOffsets,
+ kafkaTopicPartitions,
+ getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ startupMode);
+
+ if (subscribedPartitionsToStartOffsets.size() != 0) {
+ switch (startupMode) {
+ case EARLIEST:
+ LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ subscribedPartitionsToStartOffsets.size(),
+ subscribedPartitionsToStartOffsets.keySet());
+ break;
+ case LATEST:
+ LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ subscribedPartitionsToStartOffsets.size(),
+ subscribedPartitionsToStartOffsets.keySet());
+ break;
+ default:
+ case GROUP_OFFSETS:
+ LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ subscribedPartitionsToStartOffsets.size(),
+ subscribedPartitionsToStartOffsets.keySet());
+ }
+ }
+ }
+ }
+ }
+
+ @Override
public void run(SourceContext<T> sourceContext) throws Exception {
- if (subscribedPartitions == null) {
+ if (subscribedPartitionsToStartOffsets == null) {
throw new Exception("The partitions were not set for the consumer");
}
// we need only do work, if we actually have partitions assigned
- if (!subscribedPartitions.isEmpty()) {
+ if (!subscribedPartitionsToStartOffsets.isEmpty()) {
// create the fetcher that will communicate with the Kafka brokers
final AbstractFetcher<T, ?> fetcher = createFetcher(
- sourceContext, subscribedPartitions, restoredState,
+ sourceContext, subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner, punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext());
@@ -327,15 +362,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
}
@Override
- public void open(Configuration configuration) {
- List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
-
- if (kafkaTopicPartitions != null) {
- assignTopicPartitions(kafkaTopicPartitions);
- }
- }
-
- @Override
public void close() throws Exception {
// pretty much the same logic as cancelling
try {
@@ -386,18 +412,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
-
- if (restoredState != null) {
-
- for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoredState.entrySet()) {
- offsetsStateForCheckpoint.add(
- Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
- }
- } else if (subscribedPartitions != null) {
- for (KafkaTopicPartition subscribedPartition : subscribedPartitions) {
- offsetsStateForCheckpoint.add(
- Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET));
- }
+ for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
+ offsetsStateForCheckpoint.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
// the map cannot be asynchronously updated, because only one checkpoint call can happen
@@ -493,7 +509,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
* data, and emits it into the data streams.
*
* @param sourceContext The source context to emit data to.
- * @param thisSubtaskPartitions The set of partitions that this subtask should handle.
+ * @param subscribedPartitionsToStartOffsets The set of partitions that this subtask should handle, with their start offsets.
* @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator.
* @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator.
* @param runtimeContext The task's runtime context.
@@ -504,8 +520,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
*/
protected abstract AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
- List<KafkaTopicPartition> thisSubtaskPartitions,
- HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+ Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext) throws Exception;
@@ -525,60 +540,33 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
// Utilities
// ------------------------------------------------------------------------
- private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) {
- subscribedPartitions = new ArrayList<>();
-
- if (restoredState != null) {
- for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
- if (restoredState.containsKey(kafkaTopicPartition)) {
- subscribedPartitions.add(kafkaTopicPartition);
- }
- }
- } else {
- Collections.sort(kafkaTopicPartitions, new Comparator<KafkaTopicPartition>() {
- @Override
- public int compare(KafkaTopicPartition o1, KafkaTopicPartition o2) {
- int topicComparison = o1.getTopic().compareTo(o2.getTopic());
-
- if (topicComparison == 0) {
- return o1.getPartition() - o2.getPartition();
- } else {
- return topicComparison;
- }
- }
- });
-
- for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) {
- subscribedPartitions.add(kafkaTopicPartitions.get(i));
- }
- }
- }
-
/**
- * Selects which of the given partitions should be handled by a specific consumer,
- * given a certain number of consumers.
- *
- * @param allPartitions The partitions to select from
- * @param numConsumers The number of consumers
- * @param consumerIndex The index of the specific consumer
- *
- * @return The sublist of partitions to be handled by that consumer.
+ * Initializes {@link FlinkKafkaConsumerBase#subscribedPartitionsToStartOffsets} with appropriate
+ * values. The method decides which partitions this consumer instance should subscribe to, and also
+ * sets the initial offset each subscribed partition should be started from based on the configured startup mode.
+ *
+ * @param subscribedPartitionsToStartOffsets to subscribedPartitionsToStartOffsets to initialize
+ * @param kafkaTopicPartitions the complete list of all Kafka partitions
+ * @param indexOfThisSubtask the index of this consumer instance
+ * @param numParallelSubtasks total number of parallel consumer instances
+ * @param startupMode the configured startup mode for the consumer
+ *
+ * Note: This method is also exposed for testing.
*/
- protected static List<KafkaTopicPartition> assignPartitions(
- List<KafkaTopicPartition> allPartitions,
- int numConsumers, int consumerIndex) {
- final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
- allPartitions.size() / numConsumers + 1);
-
- for (int i = 0; i < allPartitions.size(); i++) {
- if (i % numConsumers == consumerIndex) {
- thisSubtaskPartitions.add(allPartitions.get(i));
+ protected static void initializeSubscribedPartitionsToStartOffsets(
+ Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
+ List<KafkaTopicPartition> kafkaTopicPartitions,
+ int indexOfThisSubtask,
+ int numParallelSubtasks,
+ StartupMode startupMode) {
+
+ for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
+ if (i % numParallelSubtasks == indexOfThisSubtask) {
+ subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
}
}
-
- return thisSubtaskPartitions;
}
-
+
/**
* Logs the partition information in INFO level.
*
@@ -607,8 +595,17 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
}
@VisibleForTesting
- List<KafkaTopicPartition> getSubscribedPartitions() {
- return subscribedPartitions;
+ void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) {
+ checkNotNull(allSubscribedPartitions);
+ this.subscribedPartitionsToStartOffsets = new HashMap<>();
+ for (KafkaTopicPartition partition : allSubscribedPartitions) {
+ this.subscribedPartitionsToStartOffsets.put(partition, null);
+ }
+ }
+
+ @VisibleForTesting
+ Map<KafkaTopicPartition, Long> getSubscribedPartitionsToStartOffsets() {
+ return subscribedPartitionsToStartOffsets;
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
index 331c1a6..f796e62 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
@@ -16,18 +16,30 @@
*/
package org.apache.flink.streaming.connectors.kafka.config;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+
/**
* Startup modes for the Kafka Consumer.
*/
public enum StartupMode {
/** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default) */
- GROUP_OFFSETS,
+ GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),
/** Start from the earliest offset possible */
- EARLIEST,
+ EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
/** Start from the latest offset */
- LATEST
-
+ LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+ /** The sentinel offset value corresponding to this startup mode */
+ private long stateSentinel;
+
+ StartupMode(long stateSentinel) {
+ this.stateSentinel = stateSentinel;
+ }
+
+ public long getStateSentinel() {
+ return stateSentinel;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index b27e996..e021881 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -26,12 +26,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.util.SerializedValue;
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,20 +61,14 @@ public abstract class AbstractFetcher<T, KPH> {
protected final Object checkpointLock;
/** All partitions (and their state) that this fetcher is subscribed to */
- private final KafkaTopicPartitionState<KPH>[] allPartitions;
+ private final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates;
/** The mode describing whether the fetcher also generates timestamps and watermarks */
protected final int timestampWatermarkMode;
- /** The startup mode for the consumer (only relevant if the consumer wasn't restored) */
- protected final StartupMode startupMode;
-
/** Flag whether to register metrics for the fetcher */
protected final boolean useMetrics;
- /** Flag whether or not the consumer state was restored from a checkpoint / savepoint */
- protected final boolean isRestored;
-
/** Only relevant for punctuated watermarks: The current cross partition watermark */
private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
@@ -84,19 +76,16 @@ public abstract class AbstractFetcher<T, KPH> {
protected AbstractFetcher(
SourceContext<T> sourceContext,
- List<KafkaTopicPartition> assignedPartitions,
- HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
- StartupMode startupMode,
boolean useMetrics) throws Exception
{
this.sourceContext = checkNotNull(sourceContext);
this.checkpointLock = sourceContext.getCheckpointLock();
- this.startupMode = checkNotNull(startupMode);
this.useMetrics = useMetrics;
// figure out what we watermark mode we will be using
@@ -115,30 +104,25 @@ public abstract class AbstractFetcher<T, KPH> {
throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
}
}
-
+
// create our partition state according to the timestamp/watermark mode
- this.allPartitions = initializePartitions(
- assignedPartitions,
+ this.subscribedPartitionStates = initializeSubscribedPartitionStates(
+ assignedPartitionsWithInitialOffsets,
timestampWatermarkMode,
watermarksPeriodic, watermarksPunctuated,
userCodeClassLoader);
- if (restoredSnapshotState != null) {
- for (KafkaTopicPartitionState<?> partition : allPartitions) {
- Long offset = restoredSnapshotState.get(partition.getKafkaTopicPartition());
- if (offset != null) {
- partition.setOffset(offset);
- }
+ // check that all partition states have a defined offset
+ for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
+ if (!partitionState.isOffsetDefined()) {
+ throw new IllegalArgumentException("The fetcher was assigned partitions with undefined initial offsets.");
}
- this.isRestored = true;
- } else {
- this.isRestored = false;
}
// if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts =
- (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
+ (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) subscribedPartitionStates;
PeriodicWatermarkEmitter periodicEmitter =
new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
@@ -155,8 +139,8 @@ public abstract class AbstractFetcher<T, KPH> {
*
* @return All subscribed partitions.
*/
- protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
- return allPartitions;
+ protected final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates() {
+ return subscribedPartitionStates;
}
// ------------------------------------------------------------------------
@@ -207,8 +191,8 @@ public abstract class AbstractFetcher<T, KPH> {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
- HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
- for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
+ HashMap<KafkaTopicPartition, Long> state = new HashMap<>(subscribedPartitionStates.length);
+ for (KafkaTopicPartitionState<?> partition : subscribedPartitionStates()) {
state.put(partition.getKafkaTopicPartition(), partition.getOffset());
}
return state;
@@ -343,7 +327,7 @@ public abstract class AbstractFetcher<T, KPH> {
if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
long newMin = Long.MAX_VALUE;
- for (KafkaTopicPartitionState<?> state : allPartitions) {
+ for (KafkaTopicPartitionState<?> state : subscribedPartitionStates) {
@SuppressWarnings("unchecked")
final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
@@ -371,8 +355,8 @@ public abstract class AbstractFetcher<T, KPH> {
* Utility method that takes the topic partitions and creates the topic partition state
* holders. If a watermark generator per partition exists, this will also initialize those.
*/
- private KafkaTopicPartitionState<KPH>[] initializePartitions(
- List<KafkaTopicPartition> assignedPartitions,
+ private KafkaTopicPartitionState<KPH>[] initializeSubscribedPartitionStates(
+ Map<KafkaTopicPartition, Long> assignedPartitionsToInitialOffsets,
int timestampWatermarkMode,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
@@ -384,13 +368,16 @@ public abstract class AbstractFetcher<T, KPH> {
case NO_TIMESTAMPS_WATERMARKS: {
@SuppressWarnings("unchecked")
KafkaTopicPartitionState<KPH>[] partitions =
- (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
+ (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitionsToInitialOffsets.size()];
int pos = 0;
- for (KafkaTopicPartition partition : assignedPartitions) {
+ for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
// create the kafka version specific partition handle
- KPH kafkaHandle = createKafkaPartitionHandle(partition);
- partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle);
+ KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey());
+ partitions[pos] = new KafkaTopicPartitionState<>(partition.getKey(), kafkaHandle);
+ partitions[pos].setOffset(partition.getValue());
+
+ pos++;
}
return partitions;
@@ -400,17 +387,20 @@ public abstract class AbstractFetcher<T, KPH> {
@SuppressWarnings("unchecked")
KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
- new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
+ new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()];
int pos = 0;
- for (KafkaTopicPartition partition : assignedPartitions) {
- KPH kafkaHandle = createKafkaPartitionHandle(partition);
+ for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
+ KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey());
AssignerWithPeriodicWatermarks<T> assignerInstance =
watermarksPeriodic.deserializeValue(userCodeClassLoader);
- partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
- partition, kafkaHandle, assignerInstance);
+ partitions[pos] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+ partition.getKey(), kafkaHandle, assignerInstance);
+ partitions[pos].setOffset(partition.getValue());
+
+ pos++;
}
return partitions;
@@ -420,17 +410,20 @@ public abstract class AbstractFetcher<T, KPH> {
@SuppressWarnings("unchecked")
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
- new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitions.size()];
+ new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()];
int pos = 0;
- for (KafkaTopicPartition partition : assignedPartitions) {
- KPH kafkaHandle = createKafkaPartitionHandle(partition);
+ for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
+ KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey());
AssignerWithPunctuatedWatermarks<T> assignerInstance =
watermarksPunctuated.deserializeValue(userCodeClassLoader);
- partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
- partition, kafkaHandle, assignerInstance);
+ partitions[pos] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+ partition.getKey(), kafkaHandle, assignerInstance);
+ partitions[pos].setOffset(partition.getValue());
+
+ pos++;
}
return partitions;
@@ -452,7 +445,7 @@ public abstract class AbstractFetcher<T, KPH> {
// add current offsets to gage
MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
- for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
+ for (KafkaTopicPartitionState<?> ktp: subscribedPartitionStates()) {
currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
index 7cb5f46..adfbf79 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -28,10 +28,6 @@ package org.apache.flink.streaming.connectors.kafka.internals;
* @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
*/
public class KafkaTopicPartitionState<KPH> {
-
- /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
- * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
- public static final long OFFSET_NOT_SET = -915623761776L;
// ------------------------------------------------------------------------
@@ -52,8 +48,8 @@ public class KafkaTopicPartitionState<KPH> {
public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
this.partition = partition;
this.kafkaPartitionHandle = kafkaPartitionHandle;
- this.offset = OFFSET_NOT_SET;
- this.committedOffset = OFFSET_NOT_SET;
+ this.offset = KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
+ this.committedOffset = KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
}
// ------------------------------------------------------------------------
@@ -96,7 +92,7 @@ public class KafkaTopicPartitionState<KPH> {
}
public final boolean isOffsetDefined() {
- return offset != OFFSET_NOT_SET;
+ return offset != KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
}
public final void setCommittedOffset(long offset) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
new file mode 100644
index 0000000..153a326
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * Magic values used to represent special offset states before partitions are actually read.
+ *
+ * The values are all negative. Negative offsets are not used by Kafka (invalid), so we
+ * pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else.
+ */
+public class KafkaTopicPartitionStateSentinel {
+
+ /** Magic number that defines an unset offset. */
+ public static final long OFFSET_NOT_SET = -915623761776L;
+
+ /**
+ * Magic number that defines the partition should start from the earliest offset.
+ *
+ * This is used as a placeholder so that the actual earliest offset can be evaluated lazily
+ * when the partition will actually start to be read by the consumer.
+ */
+ public static final long EARLIEST_OFFSET = -915623761775L;
+
+ /**
+ * Magic number that defines the partition should start from the latest offset.
+ *
+ * This is used as a placeholder so that the actual latest offset can be evaluated lazily
+ * when the partition will actually start to be read by the consumer.
+ */
+ public static final long LATEST_OFFSET = -915623761774L;
+
+ /**
+ * Magic number that defines the partition should start from its committed group offset in Kafka.
+ *
+ * This is used as a placeholder so that the actual committed group offset can be evaluated lazily
+ * when the partition will actually start to be read by the consumer.
+ */
+ public static final long GROUP_OFFSET = -915623761773L;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 38a3ce8..20411e1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
@@ -34,6 +35,7 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Map;
import java.util.HashMap;
import java.util.List;
@@ -68,8 +70,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
testHarness.open();
// assert that no partitions were found and is empty
- Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null);
- Assert.assertTrue(consumerFunction.getSubscribedPartitions().isEmpty());
+ Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
+ Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
// assert that no state was restored
Assert.assertTrue(consumerFunction.getRestoredState() == null);
@@ -101,10 +103,16 @@ public class FlinkKafkaConsumerBaseMigrationTest {
getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
testHarness.open();
+ // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot-empty-state";
+ // since the state is empty, the consumer should reflect on the startup mode to determine start offsets.
+ final HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<>();
+ expectedSubscribedPartitionsWithStartOffsets.put(new KafkaTopicPartition("abc", 13), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ expectedSubscribedPartitionsWithStartOffsets.put(new KafkaTopicPartition("def", 7), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
// assert that there are partitions and is identical to expected list
- Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null);
- Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
- Assert.assertTrue(consumerFunction.getSubscribedPartitions().equals(partitions));
+ Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
+ Assert.assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+ Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, consumerFunction.getSubscribedPartitionsToStartOffsets());
// assert that no state was restored
Assert.assertTrue(consumerFunction.getRestoredState() == null);
@@ -136,16 +144,18 @@ public class FlinkKafkaConsumerBaseMigrationTest {
getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
testHarness.open();
- // assert that there are partitions and is identical to expected list
- Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null);
- Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
- Assert.assertEquals(partitions, consumerFunction.getSubscribedPartitions());
-
// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
final HashMap<KafkaTopicPartition, Long> expectedState = new HashMap<>();
expectedState.put(new KafkaTopicPartition("abc", 13), 16768L);
expectedState.put(new KafkaTopicPartition("def", 7), 987654321L);
+ // assert that there are partitions and is identical to expected list
+ Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
+ Assert.assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+
+ // on restore, subscribedPartitionsToStartOffsets should be identical to the restored state
+ Assert.assertEquals(expectedState, consumerFunction.getSubscribedPartitionsToStartOffsets());
+
// assert that state is correctly restored from legacy checkpoint
Assert.assertTrue(consumerFunction.getRestoredState() != null);
Assert.assertEquals(expectedState, consumerFunction.getRestoredState());
@@ -179,8 +189,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
- List<KafkaTopicPartition> thisSubtaskPartitions,
- HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+ Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 980a025..e6ea63f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
@@ -42,13 +43,7 @@ import org.mockito.stubbing.Answer;
import java.io.Serializable;
import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -137,6 +132,8 @@ public class FlinkKafkaConsumerBaseTest {
consumer.initializeState(initializationContext);
+ consumer.open(new Configuration());
+
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
// ensure that the list was cleared and refilled. while this is an implementation detail, we use it here
@@ -177,6 +174,8 @@ public class FlinkKafkaConsumerBaseTest {
consumer.initializeState(initializationContext);
+ consumer.open(new Configuration());
+
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
assertFalse(listState.get().iterator().hasNext());
@@ -364,15 +363,10 @@ public class FlinkKafkaConsumerBaseTest {
@SuppressWarnings("unchecked")
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
- List<KafkaTopicPartition> thisSubtaskPartitions,
- HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+ Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext) throws Exception {
- if (restoredSnapshotState != null) {
- Assert.fail("Trying to restore offsets even though there was no restore state.");
- return null;
- }
return mock(AbstractFetcher.class);
}
[3/3] flink git commit: [hotfix] [kafka] Cleanup star / unused
imports in all Flink Kafka tests
Posted by tz...@apache.org.
[hotfix] [kafka] Cleanup star / unused imports in all Flink Kafka tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8bcb2ae3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8bcb2ae3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8bcb2ae3
Branch: refs/heads/master
Commit: 8bcb2ae3ccf6a58d8f42f29d67fdb7d88a95f8ed
Parents: ed68fed
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Feb 28 00:55:25 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Feb 28 01:47:08 2017 +0800
----------------------------------------------------------------------
.../streaming/connectors/kafka/Kafka08ITCase.java | 10 ----------
.../streaming/connectors/kafka/KafkaProducerTest.java | 11 +++++++----
.../kafka/internals/ClosableBlockingQueueTest.java | 8 +++++++-
.../connectors/kafka/internal/HandoverTest.java | 6 ++++--
.../connectors/kafka/FlinkKafkaConsumerBaseTest.java | 13 ++++++++-----
.../kafka/KafkaConsumerPartitionAssignmentTest.java | 4 +++-
.../connectors/kafka/KafkaShortRetentionTestBase.java | 1 +
.../connectors/kafka/KafkaTestEnvironment.java | 3 ---
.../kafka/internals/AbstractFetcherTimestampsTest.java | 7 +++----
.../kafka/internals/KafkaTopicPartitionTest.java | 4 +++-
.../connectors/kafka/testutils/DataGenerators.java | 5 -----
11 files changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8bcb2ae3/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 334bd2b..3fc00e9 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -48,16 +48,6 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
runSimpleConcurrentProducerConsumerTopology();
}
-// @Test(timeout = 60000)
-// public void testPunctuatedExplicitWMConsumer() throws Exception {
-// runExplicitPunctuatedWMgeneratingConsumerTest(false);
-// }
-
-// @Test(timeout = 60000)
-// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
-// runExplicitPunctuatedWMgeneratingConsumerTest(true);
-// }
-
@Test(timeout = 60000)
public void testKeyValueSupport() throws Exception {
runKeyValueTest();
http://git-wip-us.apache.org/repos/asf/flink/blob/8bcb2ae3/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 91fc286..65d7596 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -41,12 +41,15 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Collections;
import java.util.concurrent.Future;
-
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.whenNew;
-import static org.junit.Assert.*;
-
@RunWith(PowerMockRunner.class)
@PrepareForTest(FlinkKafkaProducerBase.class)
public class KafkaProducerTest extends TestLogger {
http://git-wip-us.apache.org/repos/asf/flink/blob/8bcb2ae3/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
index 6298c92..2df67d9 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
@@ -25,11 +25,17 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
-import static org.junit.Assert.*;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class ClosableBlockingQueueTest {
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8bcb2ae3/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
index 25040eb..e95b51b 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
@@ -28,8 +28,10 @@ import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
/**
* Tests for the {@link Handover} between Kafka Consumer Thread and the fetcher's main thread.
http://git-wip-us.apache.org/repos/asf/flink/blob/8bcb2ae3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index e6ea63f..33f1b85 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -38,19 +38,22 @@ import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import java.io.Serializable;
import java.lang.reflect.Field;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
http://git-wip-us.apache.org/repos/asf/flink/blob/8bcb2ae3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index 3bdfbed..379d53a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -31,7 +31,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Set;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests that the partition assignment is deterministic and stable.
http://git-wip-us.apache.org/repos/asf/flink/blob/8bcb2ae3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 6a1f702..1e85370 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -48,6 +48,7 @@ import java.util.Properties;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+
/**
* A class containing a special Kafka broker which has a log retention of only 250 ms.
* This way, we can make sure our consumer is properly handling cases where we run into out of offset
http://git-wip-us.apache.org/repos/asf/flink/blob/8bcb2ae3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 7f2a816..9a7c96a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
import kafka.server.KafkaServer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
@@ -30,8 +29,6 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.Properties;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8bcb2ae3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 9e9923d..17a375d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -31,12 +30,12 @@ import org.apache.flink.util.SerializedValue;
import org.junit.Test;
import javax.annotation.Nullable;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
@SuppressWarnings("serial")
public class AbstractFetcherTimestampsTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/8bcb2ae3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
index 0e16263..b215bd3 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
@@ -23,7 +23,9 @@ import org.junit.Test;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class KafkaTopicPartitionTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/8bcb2ae3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 9e8e1d9..c383eb5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -20,13 +20,10 @@ package org.apache.flink.streaming.connectors.kafka.testutils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -46,8 +43,6 @@ import java.util.Collection;
import java.util.Properties;
import java.util.Random;
-import static org.mockito.Mockito.mock;
-
@SuppressWarnings("serial")
public class DataGenerators {