You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/13 15:39:36 UTC

[2/2] flink git commit: [FLINK-3123] [kafka] Allow custom specific start offsets for FlinkKafkaConsumer

[FLINK-3123] [kafka] Allow custom specific start offsets for FlinkKafkaConsumer

This closes #2687.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f08e535
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f08e535
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f08e535

Branch: refs/heads/master
Commit: 5f08e53592ebd29cfcd8ee486fcfd6229b82aa69
Parents: f214317
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Mar 10 21:11:42 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Mar 13 23:38:13 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |  35 ++++++-
 .../connectors/kafka/Kafka010ITCase.java        |   4 +
 .../connectors/kafka/Kafka08ITCase.java         |   9 +-
 .../connectors/kafka/Kafka09ITCase.java         |   4 +
 .../kafka/FlinkKafkaConsumerBase.java           | 101 ++++++++++++++++++-
 .../connectors/kafka/config/StartupMode.java    |   9 +-
 .../KafkaConsumerPartitionAssignmentTest.java   |  33 ++++--
 .../connectors/kafka/KafkaConsumerTestBase.java |  82 +++++++++++++--
 8 files changed, 251 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 06e40b2..6d58b0c 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -220,7 +220,40 @@ All versions of the Flink Kafka Consumer have the above explicit configuration m
  record. Under these modes, committed offsets in Kafka will be ignored and
  not used as starting positions.
  
-Note that these settings do not affect the start position when the job is
+You can also specify the exact offsets the consumer should start from for each partition:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
+
+myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
+
+myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
+{% endhighlight %}
+</div>
+</div>
+
+The above example configures the consumer to start from the specified offsets for
+partitions 0, 1, and 2 of topic `myTopic`. The offset values should be the
+next record that the consumer should read for each partition. Note that
+if the consumer needs to read a partition which does not have a specified
+offset within the provided offsets map, it will fallback to the default
+group offsets behaviour (i.e. `setStartFromGroupOffsets()`) for that
+particular partition.
+
+Note that these start position configuration methods do not affect the start position when the job is
 automatically restored from a failure or manually restored using a savepoint.
 On restore, the start position of each Kafka partition is determined by the
 offsets stored in the savepoint or checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index a375fb6..2085169 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -147,6 +147,10 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 		runStartFromGroupOffsets();
 	}
 
+	@Test(timeout = 60000)
+	public void testStartFromSpecificOffsets() throws Exception {
+		runStartFromSpecificOffsets();
+	}
 
 	// --- offset committing ---
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 3fc00e9..2e7c368 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -90,7 +90,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env.getConfig().disableSysoutLogging();
 
-		readSequence(env, StartupMode.GROUP_OFFSETS, standardProps, parallelism, topic, valuesCount, startFrom);
+		readSequence(env, StartupMode.GROUP_OFFSETS, null, standardProps, parallelism, topic, valuesCount, startFrom);
 
 		deleteTestTopic(topic);
 	}
@@ -136,6 +136,11 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		runStartFromGroupOffsets();
 	}
 
+	@Test(timeout = 60000)
+	public void testStartFromSpecificOffsets() throws Exception {
+		runStartFromSpecificOffsets();
+	}
+
 	// --- offset committing ---
 
 	@Test(timeout = 60000)
@@ -200,7 +205,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		readProps.setProperty("auto.commit.interval.ms", "500");
 
 		// read so that the offset can be committed to ZK
-		readSequence(env, StartupMode.GROUP_OFFSETS, readProps, parallelism, topicName, 100, 0);
+		readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, parallelism, topicName, 100, 0);
 
 		// get the offset
 		CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 6added7..ca9965c 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -127,6 +127,10 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 		runStartFromGroupOffsets();
 	}
 
+	@Test(timeout = 60000)
+	public void testStartFromSpecificOffsets() throws Exception {
+		runStartFromSpecificOffsets();
+	}
 
 	// --- offset committing ---
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 144ede8..027751c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -40,11 +40,13 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -104,6 +106,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	/** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}) */
 	protected StartupMode startupMode = StartupMode.GROUP_OFFSETS;
 
+	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS} */
+	protected Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
 	// ------------------------------------------------------------------------
 	//  runtime state (used individually by each parallel subtask) 
 	// ------------------------------------------------------------------------
@@ -210,23 +215,33 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 	/**
 	 * Specifies the consumer to start reading from the earliest offset for all partitions.
-	 * This ignores any committed group offsets in Zookeeper / Kafka brokers.
+	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+	 *
+	 * This method does not effect where partitions are read from when the consumer is restored
+	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+	 * savepoint, only the offsets in the restored state will be used.
 	 *
 	 * @return The consumer object, to allow function chaining.
 	 */
 	public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
 		this.startupMode = StartupMode.EARLIEST;
+		this.specificStartupOffsets = null;
 		return this;
 	}
 
 	/**
 	 * Specifies the consumer to start reading from the latest offset for all partitions.
-	 * This ignores any committed group offsets in Zookeeper / Kafka brokers.
+	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+	 *
+	 * This method does not effect where partitions are read from when the consumer is restored
+	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+	 * savepoint, only the offsets in the restored state will be used.
 	 *
 	 * @return The consumer object, to allow function chaining.
 	 */
 	public FlinkKafkaConsumerBase<T> setStartFromLatest() {
 		this.startupMode = StartupMode.LATEST;
+		this.specificStartupOffsets = null;
 		return this;
 	}
 
@@ -236,10 +251,41 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset"
 	 * set in the configuration properties will be used for the partition.
 	 *
+	 * This method does not effect where partitions are read from when the consumer is restored
+	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+	 * savepoint, only the offsets in the restored state will be used.
+	 *
 	 * @return The consumer object, to allow function chaining.
 	 */
 	public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
 		this.startupMode = StartupMode.GROUP_OFFSETS;
+		this.specificStartupOffsets = null;
+		return this;
+	}
+
+	/**
+	 * Specifies the consumer to start reading partitions from specific offsets, set independently for each partition.
+	 * The specified offset should be the offset of the next record that will be read from partitions.
+	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+	 *
+	 * If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not subscribed by the
+	 * consumer, the entry will be ignored. If the consumer subscribes to a partition that does not exist in the provided
+	 * map of offsets, the consumer will fallback to the default group offset behaviour (see
+	 * {@link FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that particular partition.
+	 *
+	 * If the specified offset for a partition is invalid, or the behaviour for that partition is defaulted to group
+	 * offsets but still no group offset could be found for it, then the "auto.offset.reset" behaviour set in the
+	 * configuration properties will be used for the partition
+	 *
+	 * This method does not effect where partitions are read from when the consumer is restored
+	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+	 * savepoint, only the offsets in the restored state will be used.
+	 *
+	 * @return The consumer object, to allow function chaining.
+	 */
+	public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+		this.startupMode = StartupMode.SPECIFIC_OFFSETS;
+		this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
 		return this;
 	}
 
@@ -269,7 +315,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 					kafkaTopicPartitions,
 					getRuntimeContext().getIndexOfThisSubtask(),
 					getRuntimeContext().getNumberOfParallelSubtasks(),
-					startupMode);
+					startupMode,
+					specificStartupOffsets);
 
 				if (subscribedPartitionsToStartOffsets.size() != 0) {
 					switch (startupMode) {
@@ -285,6 +332,28 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 								subscribedPartitionsToStartOffsets.size(),
 								subscribedPartitionsToStartOffsets.keySet());
 							break;
+						case SPECIFIC_OFFSETS:
+							LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
+								getRuntimeContext().getIndexOfThisSubtask(),
+								subscribedPartitionsToStartOffsets.size(),
+								specificStartupOffsets,
+								subscribedPartitionsToStartOffsets.keySet());
+
+							List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
+							for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
+								if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+									partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
+								}
+							}
+
+							if (partitionsDefaultedToGroupOffsets.size() > 0) {
+								LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
+										"; their startup offsets will be defaulted to their committed group offsets in Kafka.",
+									getRuntimeContext().getIndexOfThisSubtask(),
+									partitionsDefaultedToGroupOffsets.size(),
+									partitionsDefaultedToGroupOffsets);
+							}
+							break;
 						default:
 						case GROUP_OFFSETS:
 							LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
@@ -550,6 +619,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * @param indexOfThisSubtask the index of this consumer instance
 	 * @param numParallelSubtasks total number of parallel consumer instances
 	 * @param startupMode the configured startup mode for the consumer
+	 * @param specificStartupOffsets specific partition offsets to start from
+	 *                               (only relevant if startupMode is {@link StartupMode#SPECIFIC_OFFSETS})
 	 *
 	 * Note: This method is also exposed for testing.
 	 */
@@ -558,11 +629,31 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			List<KafkaTopicPartition> kafkaTopicPartitions,
 			int indexOfThisSubtask,
 			int numParallelSubtasks,
-			StartupMode startupMode) {
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
 
 		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
 			if (i % numParallelSubtasks == indexOfThisSubtask) {
-				subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
+				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
+					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
+				} else {
+					if (specificStartupOffsets == null) {
+						throw new IllegalArgumentException(
+							"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
+								", but no specific offsets were specified");
+					}
+
+					KafkaTopicPartition partition = kafkaTopicPartitions.get(i);
+
+					Long specificOffset = specificStartupOffsets.get(partition);
+					if (specificOffset != null) {
+						// since the specified offsets represent the next record to read, we subtract
+						// it by one so that the initial state of the consumer will be correct
+						subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1);
+					} else {
+						subscribedPartitionsToStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+					}
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
index f796e62..8fc2fe0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
@@ -30,7 +30,14 @@ public enum StartupMode {
 	EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
 
 	/** Start from the latest offset */
-	LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+	LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),
+
+	/**
+	 * Start from user-supplied specific offsets for each partition.
+	 * Since this mode will have specific offsets to start with, we do not need a sentinel value;
+	 * using Long.MIN_VALUE as a placeholder.
+	 */
+	SPECIFIC_OFFSETS(Long.MIN_VALUE);
 
 	/** The sentinel offset value corresponding to this startup mode */
 	private long stateSentinel;

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index 379d53a..c24640d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -56,7 +56,8 @@ public class KafkaConsumerPartitionAssignmentTest {
 					inPartitions,
 					i,
 					inPartitions.size(),
-					StartupMode.GROUP_OFFSETS);
+					StartupMode.GROUP_OFFSETS,
+					null);
 
 				List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
 
@@ -95,7 +96,8 @@ public class KafkaConsumerPartitionAssignmentTest {
 					partitions,
 					i,
 					numConsumers,
-					StartupMode.GROUP_OFFSETS);
+					StartupMode.GROUP_OFFSETS,
+					null);
 
 				List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
 
@@ -138,7 +140,8 @@ public class KafkaConsumerPartitionAssignmentTest {
 					inPartitions,
 					i,
 					numConsumers,
-					StartupMode.GROUP_OFFSETS);
+					StartupMode.GROUP_OFFSETS,
+					null);
 
 				List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
 
@@ -169,7 +172,8 @@ public class KafkaConsumerPartitionAssignmentTest {
 				ep,
 				2,
 				4,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 			assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
 
 			subscribedPartitionsToStartOffsets = new HashMap<>();
@@ -178,7 +182,8 @@ public class KafkaConsumerPartitionAssignmentTest {
 				ep,
 				0,
 				1,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 			assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
 		}
 		catch (Exception e) {
@@ -218,21 +223,24 @@ public class KafkaConsumerPartitionAssignmentTest {
 				initialPartitions,
 				0,
 				numConsumers,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 
 			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
 				subscribedPartitionsToStartOffsets2,
 				initialPartitions,
 				1,
 				numConsumers,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 
 			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
 				subscribedPartitionsToStartOffsets3,
 				initialPartitions,
 				2,
 				numConsumers,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 
 			List<KafkaTopicPartition> subscribedPartitions1 = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
 			List<KafkaTopicPartition> subscribedPartitions2 = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
@@ -274,21 +282,24 @@ public class KafkaConsumerPartitionAssignmentTest {
 				newPartitions,
 				0,
 				numConsumers,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 
 			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
 				subscribedPartitionsToStartOffsets2,
 				newPartitions,
 				1,
 				numConsumers,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 
 			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
 				subscribedPartitionsToStartOffsets3,
 				newPartitions,
 				2,
 				numConsumers,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 
 			List<KafkaTopicPartition> subscribedPartitions1New = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
 			List<KafkaTopicPartition> subscribedPartitions2New = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 580c507..ddac61c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -62,6 +62,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
@@ -349,7 +350,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			(o3 != null) ? o3.intValue() : 0
 		));
 
-		readSequence(env2, StartupMode.GROUP_OFFSETS, standardProps, topicName, partitionsToValuesCountAndStartOffset);
+		readSequence(env2, StartupMode.GROUP_OFFSETS, null, standardProps, topicName, partitionsToValuesCountAndStartOffset);
 
 		kafkaOffsetHandler.close();
 		deleteTestTopic(topicName);
@@ -465,7 +466,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
 		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
 
-		readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0);
+		readSequence(env, StartupMode.EARLIEST, null, readProps, parallelism, topicName, recordsInEachPartition, 0);
 
 		kafkaOffsetHandler.close();
 		deleteTestTopic(topicName);
@@ -619,7 +620,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * 	partition 2 --> start from offset 43, read to offset 49 (7 records)
 	 */
 	public void runStartFromGroupOffsets() throws Exception {
-		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
+		// 3 partitions with 50 records each (offsets 0-49)
 		final int parallelism = 3;
 		final int recordsInEachPartition = 50;
 
@@ -645,7 +646,71 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(50, 0)); // partition 1 should read offset 0-49
 		partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(7, 43));	// partition 2 should read offset 43-49
 
-		readSequence(env, StartupMode.GROUP_OFFSETS, readProps, topicName, partitionsToValueCountAndStartOffsets);
+		readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, topicName, partitionsToValueCountAndStartOffsets);
+
+		kafkaOffsetHandler.close();
+		deleteTestTopic(topicName);
+	}
+
+	/**
+	 * This test ensures that the consumer correctly uses user-supplied specific offsets when explicitly configured to
+	 * start from specific offsets. For partitions which a specific offset can not be found for, the starting position
+	 * for them should fallback to the group offsets behaviour.
+	 *
+	 * 4 partitions will have 50 records with offsets 0 to 49. The supplied specific offsets map is:
+	 * 	partition 0 --> start from offset 19
+	 * 	partition 1 --> not set
+	 * 	partition 2 --> start from offset 22
+	 * 	partition 3 --> not set
+	 * 	partition 4 --> start from offset 26 (this should be ignored because the partition does not exist)
+	 *
+	 * The partitions and their committed group offsets are setup as:
+	 * 	partition 0 --> committed offset 23
+	 * 	partition 1 --> committed offset 31
+	 * 	partition 2 --> committed offset 43
+	 * 	partition 3 --> no commit offset
+	 *
+	 * When configured to start from these specific offsets, each partition should read:
+	 * 	partition 0 --> start from offset 19, read to offset 49 (31 records)
+	 * 	partition 1 --> fallback to group offsets, so start from offset 31, read to offset 49 (19 records)
+	 * 	partition 2 --> start from offset 22, read to offset 49 (28 records)
+	 * 	partition 3 --> fallback to group offsets, but since there is no group offset for this partition,
+	 * 	                will default to "auto.offset.reset" (set to "earliest"),
+	 * 	                so start from offset 0, read to offset 49 (50 records)
+	 */
+	public void runStartFromSpecificOffsets() throws Exception {
+		// 4 partitions with 50 records each (offsets 0-49)
+		final int parallelism = 4;
+		final int recordsInEachPartition = 50;
+
+		final String topicName = writeSequence("testStartFromSpecificOffsetsTopic", recordsInEachPartition, parallelism, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.getConfig().disableSysoutLogging();
+		env.setParallelism(parallelism);
+
+		Properties readProps = new Properties();
+		readProps.putAll(standardProps);
+		readProps.setProperty("auto.offset.reset", "earliest"); // partition 3 should default back to this behaviour
+
+		Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
+		specificStartupOffsets.put(new KafkaTopicPartition(topicName, 0), 19L);
+		specificStartupOffsets.put(new KafkaTopicPartition(topicName, 2), 22L);
+		specificStartupOffsets.put(new KafkaTopicPartition(topicName, 4), 26L); // non-existing partition, should be ignored
+
+		// only the committed offset for partition 1 should be used, because partition 1 has no entry in specific offset map
+		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
+		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+		Map<Integer, Tuple2<Integer, Integer>> partitionsToValueCountAndStartOffsets = new HashMap<>();
+		partitionsToValueCountAndStartOffsets.put(0, new Tuple2<>(31, 19)); // partition 0 should read offset 19-49
+		partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(19, 31)); // partition 1 should read offset 31-49
+		partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 22));	// partition 2 should read offset 22-49
+		partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 0));	// partition 3 should read offset 0-49
+
+		readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, readProps, topicName, partitionsToValueCountAndStartOffsets);
 
 		kafkaOffsetHandler.close();
 		deleteTestTopic(topicName);
@@ -1781,6 +1846,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 */
 	protected void readSequence(final StreamExecutionEnvironment env,
 								final StartupMode startupMode,
+								final Map<KafkaTopicPartition, Long> specificStartupOffsets,
 								final Properties cc,
 								final String topicName,
 								final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) throws Exception {
@@ -1807,6 +1873,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			case LATEST:
 				consumer.setStartFromLatest();
 				break;
+			case SPECIFIC_OFFSETS:
+				consumer.setStartFromSpecificOffsets(specificStartupOffsets);
+				break;
 			case GROUP_OFFSETS:
 				consumer.setStartFromGroupOffsets();
 				break;
@@ -1874,11 +1943,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Properties, String, Map)} to
+	 * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Map, Properties, String, Map)} to
 	 * expect reading from the same start offset and the same value count for all partitions of a single Kafka topic.
 	 */
 	protected void readSequence(final StreamExecutionEnvironment env,
 								final StartupMode startupMode,
+								final Map<KafkaTopicPartition, Long> specificStartupOffsets,
 								final Properties cc,
 								final int sourceParallelism,
 								final String topicName,
@@ -1888,7 +1958,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		for (int i = 0; i < sourceParallelism; i++) {
 			partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom));
 		}
-		readSequence(env, startupMode, cc, topicName, partitionsToValuesCountAndStartOffset);
+		readSequence(env, startupMode, specificStartupOffsets, cc, topicName, partitionsToValuesCountAndStartOffset);
 	}
 
 	protected String writeSequence(