You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/13 15:39:36 UTC
[2/2] flink git commit: [FLINK-3123] [kafka] Allow custom specific
start offsets for FlinkKafkaConsumer
[FLINK-3123] [kafka] Allow custom specific start offsets for FlinkKafkaConsumer
This closes #2687.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f08e535
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f08e535
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f08e535
Branch: refs/heads/master
Commit: 5f08e53592ebd29cfcd8ee486fcfd6229b82aa69
Parents: f214317
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Mar 10 21:11:42 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Mar 13 23:38:13 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/kafka.md | 35 ++++++-
.../connectors/kafka/Kafka010ITCase.java | 4 +
.../connectors/kafka/Kafka08ITCase.java | 9 +-
.../connectors/kafka/Kafka09ITCase.java | 4 +
.../kafka/FlinkKafkaConsumerBase.java | 101 ++++++++++++++++++-
.../connectors/kafka/config/StartupMode.java | 9 +-
.../KafkaConsumerPartitionAssignmentTest.java | 33 ++++--
.../connectors/kafka/KafkaConsumerTestBase.java | 82 +++++++++++++--
8 files changed, 251 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 06e40b2..6d58b0c 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -220,7 +220,40 @@ All versions of the Flink Kafka Consumer have the above explicit configuration m
record. Under these modes, committed offsets in Kafka will be ignored and
not used as starting positions.
-Note that these settings do not affect the start position when the job is
+You can also specify the exact offsets the consumer should start from for each partition:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
+
+myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
+
+myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
+{% endhighlight %}
+</div>
+</div>
+
+The above example configures the consumer to start from the specified offsets for
+partitions 0, 1, and 2 of topic `myTopic`. The offset values should be the
+next record that the consumer should read for each partition. Note that
+if the consumer needs to read a partition which does not have a specified
+offset within the provided offsets map, it will fallback to the default
+group offsets behaviour (i.e. `setStartFromGroupOffsets()`) for that
+particular partition.
+
+Note that these start position configuration methods do not affect the start position when the job is
automatically restored from a failure or manually restored using a savepoint.
On restore, the start position of each Kafka partition is determined by the
offsets stored in the savepoint or checkpoint
http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index a375fb6..2085169 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -147,6 +147,10 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
runStartFromGroupOffsets();
}
+ @Test(timeout = 60000)
+ public void testStartFromSpecificOffsets() throws Exception {
+ runStartFromSpecificOffsets();
+ }
// --- offset committing ---
http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 3fc00e9..2e7c368 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -90,7 +90,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env.getConfig().disableSysoutLogging();
- readSequence(env, StartupMode.GROUP_OFFSETS, standardProps, parallelism, topic, valuesCount, startFrom);
+ readSequence(env, StartupMode.GROUP_OFFSETS, null, standardProps, parallelism, topic, valuesCount, startFrom);
deleteTestTopic(topic);
}
@@ -136,6 +136,11 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
runStartFromGroupOffsets();
}
+ @Test(timeout = 60000)
+ public void testStartFromSpecificOffsets() throws Exception {
+ runStartFromSpecificOffsets();
+ }
+
// --- offset committing ---
@Test(timeout = 60000)
@@ -200,7 +205,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
readProps.setProperty("auto.commit.interval.ms", "500");
// read so that the offset can be committed to ZK
- readSequence(env, StartupMode.GROUP_OFFSETS, readProps, parallelism, topicName, 100, 0);
+ readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, parallelism, topicName, 100, 0);
// get the offset
CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 6added7..ca9965c 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -127,6 +127,10 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
runStartFromGroupOffsets();
}
+ @Test(timeout = 60000)
+ public void testStartFromSpecificOffsets() throws Exception {
+ runStartFromSpecificOffsets();
+ }
// --- offset committing ---
http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 144ede8..027751c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -40,11 +40,13 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -104,6 +106,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
/** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}) */
protected StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+ /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS} */
+ protected Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
// ------------------------------------------------------------------------
// runtime state (used individually by each parallel subtask)
// ------------------------------------------------------------------------
@@ -210,23 +215,33 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
/**
* Specifies the consumer to start reading from the earliest offset for all partitions.
- * This ignores any committed group offsets in Zookeeper / Kafka brokers.
+ * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+ *
+ * This method does not effect where partitions are read from when the consumer is restored
+ * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+ * savepoint, only the offsets in the restored state will be used.
*
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
this.startupMode = StartupMode.EARLIEST;
+ this.specificStartupOffsets = null;
return this;
}
/**
* Specifies the consumer to start reading from the latest offset for all partitions.
- * This ignores any committed group offsets in Zookeeper / Kafka brokers.
+ * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+ *
+ * This method does not effect where partitions are read from when the consumer is restored
+ * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+ * savepoint, only the offsets in the restored state will be used.
*
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
+ this.specificStartupOffsets = null;
return this;
}
@@ -236,10 +251,41 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
* properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset"
* set in the configuration properties will be used for the partition.
*
+ * This method does not effect where partitions are read from when the consumer is restored
+ * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+ * savepoint, only the offsets in the restored state will be used.
+ *
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
+ this.specificStartupOffsets = null;
+ return this;
+ }
+
+ /**
+ * Specifies the consumer to start reading partitions from specific offsets, set independently for each partition.
+ * The specified offset should be the offset of the next record that will be read from partitions.
+ * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+ *
+ * If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not subscribed by the
+ * consumer, the entry will be ignored. If the consumer subscribes to a partition that does not exist in the provided
+ * map of offsets, the consumer will fallback to the default group offset behaviour (see
+ * {@link FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that particular partition.
+ *
+ * If the specified offset for a partition is invalid, or the behaviour for that partition is defaulted to group
+ * offsets but still no group offset could be found for it, then the "auto.offset.reset" behaviour set in the
+ * configuration properties will be used for the partition
+ *
+ * This method does not effect where partitions are read from when the consumer is restored
+ * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+ * savepoint, only the offsets in the restored state will be used.
+ *
+ * @return The consumer object, to allow function chaining.
+ */
+ public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+ this.startupMode = StartupMode.SPECIFIC_OFFSETS;
+ this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
return this;
}
@@ -269,7 +315,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
kafkaTopicPartitions,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
- startupMode);
+ startupMode,
+ specificStartupOffsets);
if (subscribedPartitionsToStartOffsets.size() != 0) {
switch (startupMode) {
@@ -285,6 +332,28 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
break;
+ case SPECIFIC_OFFSETS:
+ LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ subscribedPartitionsToStartOffsets.size(),
+ specificStartupOffsets,
+ subscribedPartitionsToStartOffsets.keySet());
+
+ List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
+ for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
+ if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+ partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
+ }
+ }
+
+ if (partitionsDefaultedToGroupOffsets.size() > 0) {
+ LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
+ "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ partitionsDefaultedToGroupOffsets.size(),
+ partitionsDefaultedToGroupOffsets);
+ }
+ break;
default:
case GROUP_OFFSETS:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
@@ -550,6 +619,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
* @param indexOfThisSubtask the index of this consumer instance
* @param numParallelSubtasks total number of parallel consumer instances
* @param startupMode the configured startup mode for the consumer
+ * @param specificStartupOffsets specific partition offsets to start from
+ * (only relevant if startupMode is {@link StartupMode#SPECIFIC_OFFSETS})
*
* Note: This method is also exposed for testing.
*/
@@ -558,11 +629,31 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
List<KafkaTopicPartition> kafkaTopicPartitions,
int indexOfThisSubtask,
int numParallelSubtasks,
- StartupMode startupMode) {
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
if (i % numParallelSubtasks == indexOfThisSubtask) {
- subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
+ if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
+ subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
+ } else {
+ if (specificStartupOffsets == null) {
+ throw new IllegalArgumentException(
+ "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
+ ", but no specific offsets were specified");
+ }
+
+ KafkaTopicPartition partition = kafkaTopicPartitions.get(i);
+
+ Long specificOffset = specificStartupOffsets.get(partition);
+ if (specificOffset != null) {
+ // since the specified offsets represent the next record to read, we subtract
+ // it by one so that the initial state of the consumer will be correct
+ subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1);
+ } else {
+ subscribedPartitionsToStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
index f796e62..8fc2fe0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
@@ -30,7 +30,14 @@ public enum StartupMode {
EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
/** Start from the latest offset */
- LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+ LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),
+
+ /**
+ * Start from user-supplied specific offsets for each partition.
+ * Since this mode will have specific offsets to start with, we do not need a sentinel value;
+ * using Long.MIN_VALUE as a placeholder.
+ */
+ SPECIFIC_OFFSETS(Long.MIN_VALUE);
/** The sentinel offset value corresponding to this startup mode */
private long stateSentinel;
http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index 379d53a..c24640d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -56,7 +56,8 @@ public class KafkaConsumerPartitionAssignmentTest {
inPartitions,
i,
inPartitions.size(),
- StartupMode.GROUP_OFFSETS);
+ StartupMode.GROUP_OFFSETS,
+ null);
List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
@@ -95,7 +96,8 @@ public class KafkaConsumerPartitionAssignmentTest {
partitions,
i,
numConsumers,
- StartupMode.GROUP_OFFSETS);
+ StartupMode.GROUP_OFFSETS,
+ null);
List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
@@ -138,7 +140,8 @@ public class KafkaConsumerPartitionAssignmentTest {
inPartitions,
i,
numConsumers,
- StartupMode.GROUP_OFFSETS);
+ StartupMode.GROUP_OFFSETS,
+ null);
List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
@@ -169,7 +172,8 @@ public class KafkaConsumerPartitionAssignmentTest {
ep,
2,
4,
- StartupMode.GROUP_OFFSETS);
+ StartupMode.GROUP_OFFSETS,
+ null);
assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
subscribedPartitionsToStartOffsets = new HashMap<>();
@@ -178,7 +182,8 @@ public class KafkaConsumerPartitionAssignmentTest {
ep,
0,
1,
- StartupMode.GROUP_OFFSETS);
+ StartupMode.GROUP_OFFSETS,
+ null);
assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
}
catch (Exception e) {
@@ -218,21 +223,24 @@ public class KafkaConsumerPartitionAssignmentTest {
initialPartitions,
0,
numConsumers,
- StartupMode.GROUP_OFFSETS);
+ StartupMode.GROUP_OFFSETS,
+ null);
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
subscribedPartitionsToStartOffsets2,
initialPartitions,
1,
numConsumers,
- StartupMode.GROUP_OFFSETS);
+ StartupMode.GROUP_OFFSETS,
+ null);
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
subscribedPartitionsToStartOffsets3,
initialPartitions,
2,
numConsumers,
- StartupMode.GROUP_OFFSETS);
+ StartupMode.GROUP_OFFSETS,
+ null);
List<KafkaTopicPartition> subscribedPartitions1 = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
List<KafkaTopicPartition> subscribedPartitions2 = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
@@ -274,21 +282,24 @@ public class KafkaConsumerPartitionAssignmentTest {
newPartitions,
0,
numConsumers,
- StartupMode.GROUP_OFFSETS);
+ StartupMode.GROUP_OFFSETS,
+ null);
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
subscribedPartitionsToStartOffsets2,
newPartitions,
1,
numConsumers,
- StartupMode.GROUP_OFFSETS);
+ StartupMode.GROUP_OFFSETS,
+ null);
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
subscribedPartitionsToStartOffsets3,
newPartitions,
2,
numConsumers,
- StartupMode.GROUP_OFFSETS);
+ StartupMode.GROUP_OFFSETS,
+ null);
List<KafkaTopicPartition> subscribedPartitions1New = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
List<KafkaTopicPartition> subscribedPartitions2New = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 580c507..ddac61c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -62,6 +62,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
@@ -349,7 +350,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
(o3 != null) ? o3.intValue() : 0
));
- readSequence(env2, StartupMode.GROUP_OFFSETS, standardProps, topicName, partitionsToValuesCountAndStartOffset);
+ readSequence(env2, StartupMode.GROUP_OFFSETS, null, standardProps, topicName, partitionsToValuesCountAndStartOffset);
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
@@ -465,7 +466,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
- readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0);
+ readSequence(env, StartupMode.EARLIEST, null, readProps, parallelism, topicName, recordsInEachPartition, 0);
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
@@ -619,7 +620,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
* partition 2 --> start from offset 43, read to offset 49 (7 records)
*/
public void runStartFromGroupOffsets() throws Exception {
- // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
+ // 3 partitions with 50 records each (offsets 0-49)
final int parallelism = 3;
final int recordsInEachPartition = 50;
@@ -645,7 +646,71 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(50, 0)); // partition 1 should read offset 0-49
partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(7, 43)); // partition 2 should read offset 43-49
- readSequence(env, StartupMode.GROUP_OFFSETS, readProps, topicName, partitionsToValueCountAndStartOffsets);
+ readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, topicName, partitionsToValueCountAndStartOffsets);
+
+ kafkaOffsetHandler.close();
+ deleteTestTopic(topicName);
+ }
+
+ /**
+ * This test ensures that the consumer correctly uses user-supplied specific offsets when explicitly configured to
+ * start from specific offsets. For partitions which a specific offset can not be found for, the starting position
+ * for them should fallback to the group offsets behaviour.
+ *
+ * 4 partitions will have 50 records with offsets 0 to 49. The supplied specific offsets map is:
+ * partition 0 --> start from offset 19
+ * partition 1 --> not set
+ * partition 2 --> start from offset 22
+ * partition 3 --> not set
+ * partition 4 --> start from offset 26 (this should be ignored because the partition does not exist)
+ *
+ * The partitions and their committed group offsets are setup as:
+ * partition 0 --> committed offset 23
+ * partition 1 --> committed offset 31
+ * partition 2 --> committed offset 43
+ * partition 3 --> no commit offset
+ *
+ * When configured to start from these specific offsets, each partition should read:
+ * partition 0 --> start from offset 19, read to offset 49 (31 records)
+ * partition 1 --> fallback to group offsets, so start from offset 31, read to offset 49 (19 records)
+ * partition 2 --> start from offset 22, read to offset 49 (28 records)
+ * partition 3 --> fallback to group offsets, but since there is no group offset for this partition,
+ * will default to "auto.offset.reset" (set to "earliest"),
+ * so start from offset 0, read to offset 49 (50 records)
+ */
+ public void runStartFromSpecificOffsets() throws Exception {
+ // 4 partitions with 50 records each (offsets 0-49)
+ final int parallelism = 4;
+ final int recordsInEachPartition = 50;
+
+ final String topicName = writeSequence("testStartFromSpecificOffsetsTopic", recordsInEachPartition, parallelism, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+ env.setParallelism(parallelism);
+
+ Properties readProps = new Properties();
+ readProps.putAll(standardProps);
+ readProps.setProperty("auto.offset.reset", "earliest"); // partition 3 should default back to this behaviour
+
+ Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
+ specificStartupOffsets.put(new KafkaTopicPartition(topicName, 0), 19L);
+ specificStartupOffsets.put(new KafkaTopicPartition(topicName, 2), 22L);
+ specificStartupOffsets.put(new KafkaTopicPartition(topicName, 4), 26L); // non-existing partition, should be ignored
+
+ // only the committed offset for partition 1 should be used, because partition 1 has no entry in specific offset map
+ KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
+ kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+ Map<Integer, Tuple2<Integer, Integer>> partitionsToValueCountAndStartOffsets = new HashMap<>();
+ partitionsToValueCountAndStartOffsets.put(0, new Tuple2<>(31, 19)); // partition 0 should read offset 19-49
+ partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(19, 31)); // partition 1 should read offset 31-49
+ partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 22)); // partition 2 should read offset 22-49
+ partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 0)); // partition 3 should read offset 0-49
+
+ readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, readProps, topicName, partitionsToValueCountAndStartOffsets);
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
@@ -1781,6 +1846,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
*/
protected void readSequence(final StreamExecutionEnvironment env,
final StartupMode startupMode,
+ final Map<KafkaTopicPartition, Long> specificStartupOffsets,
final Properties cc,
final String topicName,
final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) throws Exception {
@@ -1807,6 +1873,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
case LATEST:
consumer.setStartFromLatest();
break;
+ case SPECIFIC_OFFSETS:
+ consumer.setStartFromSpecificOffsets(specificStartupOffsets);
+ break;
case GROUP_OFFSETS:
consumer.setStartFromGroupOffsets();
break;
@@ -1874,11 +1943,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
/**
- * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Properties, String, Map)} to
+ * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Map, Properties, String, Map)} to
* expect reading from the same start offset and the same value count for all partitions of a single Kafka topic.
*/
protected void readSequence(final StreamExecutionEnvironment env,
final StartupMode startupMode,
+ final Map<KafkaTopicPartition, Long> specificStartupOffsets,
final Properties cc,
final int sourceParallelism,
final String topicName,
@@ -1888,7 +1958,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
for (int i = 0; i < sourceParallelism; i++) {
partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom));
}
- readSequence(env, startupMode, cc, topicName, partitionsToValuesCountAndStartOffset);
+ readSequence(env, startupMode, specificStartupOffsets, cc, topicName, partitionsToValuesCountAndStartOffset);
}
protected String writeSequence(