You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/27 17:49:25 UTC

[2/3] flink git commit: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset determination to open()

[FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset determination to open()

This closes #3378.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed68fedb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed68fedb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed68fedb

Branch: refs/heads/master
Commit: ed68fedbe90db03823d75a020510ad3c344fa73e
Parents: 72f56d1
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Feb 21 23:05:32 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Feb 28 00:54:48 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer010.java |   9 +-
 .../kafka/internal/Kafka010Fetcher.java         |  12 +-
 .../internal/KafkaConsumerCallBridge010.java    |   9 +-
 .../connectors/kafka/Kafka010FetcherTest.java   |  23 +-
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  22 +-
 .../kafka/internals/Kafka08Fetcher.java         |  77 +++----
 .../kafka/internals/ZookeeperOffsetHandler.java |  18 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |   9 +-
 .../kafka/internal/Kafka09Fetcher.java          |  17 +-
 .../kafka/internal/KafkaConsumerCallBridge.java |  12 +-
 .../kafka/internal/KafkaConsumerThread.java     |  79 ++-----
 .../connectors/kafka/Kafka09FetcherTest.java    |  23 +-
 .../kafka/FlinkKafkaConsumerBase.java           | 187 ++++++++--------
 .../connectors/kafka/config/StartupMode.java    |  20 +-
 .../kafka/internals/AbstractFetcher.java        |  89 ++++----
 .../internals/KafkaTopicPartitionState.java     |  10 +-
 .../KafkaTopicPartitionStateSentinel.java       |  55 +++++
 .../FlinkKafkaConsumerBaseMigrationTest.java    |  33 ++-
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  20 +-
 .../KafkaConsumerPartitionAssignmentTest.java   | 222 ++++++++++++-------
 .../AbstractFetcherTimestampsTest.java          |  37 ++--
 21 files changed, 510 insertions(+), 473 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 3a58216..716fa19 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaW
 import org.apache.flink.util.SerializedValue;
 
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.Map;
 import java.util.List;
 import java.util.Properties;
 
@@ -128,8 +128,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 	@Override
 	protected AbstractFetcher<T, ?> createFetcher(
 			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> thisSubtaskPartitions,
-			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext) throws Exception {
@@ -138,8 +137,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 
 		return new Kafka010Fetcher<>(
 				sourceContext,
-				thisSubtaskPartitions,
-				restoredSnapshotState,
+				assignedPartitionsWithInitialOffsets,
 				watermarksPeriodic,
 				watermarksPunctuated,
 				runtimeContext.getProcessingTimeService(),
@@ -151,7 +149,6 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 				deserializer,
 				properties,
 				pollTimeout,
-				startupMode,
 				useMetrics);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index efb6f88..da6ecd0 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -22,7 +22,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -32,8 +31,7 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
-import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -48,8 +46,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 
 	public Kafka010Fetcher(
 			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> assignedPartitions,
-			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			ProcessingTimeService processingTimeProvider,
@@ -61,13 +58,11 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long pollTimeout,
-			StartupMode startupMode,
 			boolean useMetrics) throws Exception
 	{
 		super(
 				sourceContext,
-				assignedPartitions,
-				restoredSnapshotState,
+				assignedPartitionsWithInitialOffsets,
 				watermarksPeriodic,
 				watermarksPunctuated,
 				processingTimeProvider,
@@ -79,7 +74,6 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 				deserializer,
 				kafkaProperties,
 				pollTimeout,
-				startupMode,
 				useMetrics);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
index 1e0bc5b..0fda9a6 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.internal;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -39,12 +40,12 @@ public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
 	}
 
 	@Override
-	public void seekPartitionsToBeginning(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
-		consumer.seekToBeginning(partitions);
+	public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
+		consumer.seekToBeginning(Collections.singletonList(partition));
 	}
 
 	@Override
-	public void seekPartitionsToEnd(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
-		consumer.seekToEnd(partitions);
+	public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
+		consumer.seekToEnd(Collections.singletonList(partition));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 98aa28a..17ba712 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -24,10 +24,10 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -118,13 +118,13 @@ public class Kafka010FetcherTest {
 
 		@SuppressWarnings("unchecked")
 		SourceContext<String> sourceContext = mock(SourceContext.class);
-		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+			Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
 		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
 		final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
 				sourceContext,
-				topics,
-				null, /* no restored state */
+				partitionsWithInitialOffsets,
 				null, /* periodic assigner */
 				null, /* punctuated assigner */
 				new TestProcessingTimeService(),
@@ -136,7 +136,6 @@ public class Kafka010FetcherTest {
 				schema,
 				new Properties(),
 				0L,
-				StartupMode.GROUP_OFFSETS,
 				false);
 
 		// ----- run the fetcher -----
@@ -256,13 +255,13 @@ public class Kafka010FetcherTest {
 
 		@SuppressWarnings("unchecked")
 		SourceContext<String> sourceContext = mock(SourceContext.class);
-		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+			Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
 		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
 		final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
 				sourceContext,
-				topics,
-				null, /* no restored state */
+				partitionsWithInitialOffsets,
 				null, /* periodic assigner */
 				null, /* punctuated assigner */
 				new TestProcessingTimeService(),
@@ -274,7 +273,6 @@ public class Kafka010FetcherTest {
 				schema,
 				new Properties(),
 				0L,
-				StartupMode.GROUP_OFFSETS,
 				false);
 
 		// ----- run the fetcher -----
@@ -372,13 +370,13 @@ public class Kafka010FetcherTest {
 		// ----- build a fetcher -----
 
 		BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
-		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+			Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
 		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
 		final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
 				sourceContext,
-				topics,
-				null, /* no restored state */
+				partitionsWithInitialOffsets,
 				null, /* periodic watermark extractor */
 				null, /* punctuated watermark extractor */
 				new TestProcessingTimeService(),
@@ -390,7 +388,6 @@ public class Kafka010FetcherTest {
 				schema,
 				new Properties(),
 				0L,
-				StartupMode.GROUP_OFFSETS,
 				false);
 
 		// ----- run the fetcher -----

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index c0e4dd7..bf7ed02 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -45,10 +45,10 @@ import java.net.InetAddress;
 import java.net.URL;
 import java.net.UnknownHostException;
 import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
 import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
@@ -194,19 +194,23 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	@Override
 	protected AbstractFetcher<T, ?> createFetcher(
 			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> thisSubtaskPartitions,
-			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext) throws Exception {
 
 		boolean useMetrics = !Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false"));
 
-		return new Kafka08Fetcher<>(sourceContext,
-				thisSubtaskPartitions, restoredSnapshotState,
-				watermarksPeriodic, watermarksPunctuated,
-				runtimeContext, deserializer, kafkaProperties,
-				autoCommitInterval, startupMode, useMetrics);
+		return new Kafka08Fetcher<>(
+				sourceContext,
+				assignedPartitionsWithInitialOffsets,
+				watermarksPeriodic,
+				watermarksPunctuated,
+				runtimeContext,
+				deserializer,
+				kafkaProperties,
+				autoCommitInterval,
+				useMetrics);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index ad520d8..de201e5 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -91,27 +91,23 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 
 	public Kafka08Fetcher(
 			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> assignedPartitions,
-			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext,
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long autoCommitInterval,
-			StartupMode startupMode,
 			boolean useMetrics) throws Exception
 	{
 		super(
 				sourceContext,
-				assignedPartitions,
-				restoredSnapshotState,
+				assignedPartitionsWithInitialOffsets,
 				watermarksPeriodic,
 				watermarksPunctuated,
 				runtimeContext.getProcessingTimeService(),
 				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
 				runtimeContext.getUserCodeClassLoader(),
-				startupMode,
 				useMetrics);
 
 		this.deserializer = checkNotNull(deserializer);
@@ -122,7 +118,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 		this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
 
 		// initially, all these partitions are not assigned to a specific broker connection
-		for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
+		for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
 			unassignedPartitionsQueue.add(partition);
 		}
 	}
@@ -146,43 +142,32 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 		PeriodicOffsetCommitter periodicCommitter = null;
 		try {
 
-			// if we're not restored from a checkpoint, all partitions will not have their offset set;
-			// depending on the configured startup mode, accordingly set the starting offsets
-			if (!isRestored) {
-				switch (startupMode) {
-					case EARLIEST:
-						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
-							partition.setOffset(OffsetRequest.EarliestTime());
-						}
-						break;
-					case LATEST:
-						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
-							partition.setOffset(OffsetRequest.LatestTime());
-						}
-						break;
-					default:
-					case GROUP_OFFSETS:
-						List<KafkaTopicPartition> partitions = new ArrayList<>(subscribedPartitions().length);
-						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
-							partitions.add(partition.getKafkaTopicPartition());
-						}
-
-						Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitions);
-						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
-							Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
-							if (offset != null) {
-								// the committed offset in ZK represents the next record to process,
-								// so we subtract it by 1 to correctly represent internal state
-								partition.setOffset(offset - 1);
-							} else {
-								// if we can't find an offset for a partition in ZK when using GROUP_OFFSETS,
-								// we default to "auto.offset.reset" like the Kafka high-level consumer
-								LOG.warn("No group offset can be found for partition {} in Zookeeper;" +
-									" resetting starting offset to 'auto.offset.reset'", partition);
-
-								partition.setOffset(invalidOffsetBehavior);
-							}
-						}
+			// offsets in the state may still be placeholder sentinel values if we are starting fresh, or the
+			// checkpoint / savepoint state we were restored with had not completely been replaced with actual offset
+			// values yet; replace those with actual offsets, according to what the sentinel value represent.
+			for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
+				if (partition.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
+					// this will be replaced by an actual offset in SimpleConsumerThread
+					partition.setOffset(OffsetRequest.EarliestTime());
+				} else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
+					// this will be replaced by an actual offset in SimpleConsumerThread
+					partition.setOffset(OffsetRequest.LatestTime());
+				} else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+					Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
+					if (committedOffset != null) {
+						// the committed offset in ZK represents the next record to process,
+						// so we subtract it by 1 to correctly represent internal state
+						partition.setOffset(committedOffset - 1);
+					} else {
+						// if we can't find an offset for a partition in ZK when using GROUP_OFFSETS,
+						// we default to "auto.offset.reset" like the Kafka high-level consumer
+						LOG.warn("No group offset can be found for partition {} in Zookeeper;" +
+							" resetting starting offset to 'auto.offset.reset'", partition);
+
+						partition.setOffset(invalidOffsetBehavior);
+					}
+				} else {
+					// the partition already has a specific start offset and is ready to be consumed
 				}
 			}
 
@@ -191,7 +176,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 				LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval);
 
 				periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler, 
-						subscribedPartitions(), errorHandler, autoCommitInterval);
+						subscribedPartitionStates(), errorHandler, autoCommitInterval);
 				periodicCommitter.setName("Periodic Kafka partition offset committer");
 				periodicCommitter.setDaemon(true);
 				periodicCommitter.start();
@@ -388,7 +373,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 		}
 
 		// Set committed offsets in topic partition state
-		KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitions();
+		KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitionStates();
 		for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
 			Long offset = offsets.get(partition.getKafkaTopicPartition());
 			if (offset != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index 8f2ef09..cec980f 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -30,8 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -96,22 +94,12 @@ public class ZookeeperOffsetHandler {
 	}
 
 	/**
-	 * @param partitions The partitions to read offsets for.
+	 * @param partition The partition to read offset for.
 	 * @return The mapping from partition to offset.
 	 * @throws Exception This method forwards exceptions.
 	 */
-	public Map<KafkaTopicPartition, Long> getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception {
-		Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size());
-		for (KafkaTopicPartition tp : partitions) {
-			Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition());
-
-			if (offset != null) {
-				LOG.info("Offset for TopicPartition {}:{} was set to {} in ZooKeeper. Seeking fetcher to that position.",
-						tp.getTopic(), tp.getPartition(), offset);
-				ret.put(tp, offset);
-			}
-		}
-		return ret;
+	public Long getCommittedOffset(KafkaTopicPartition partition) throws Exception {
+		return getOffsetFromZooKeeper(curatorClient, groupId, partition.getTopic(), partition.getPartition());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 9a61b91..c7236a2 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -171,8 +171,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 	@Override
 	protected AbstractFetcher<T, ?> createFetcher(
 			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> thisSubtaskPartitions,
-			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext) throws Exception {
@@ -181,8 +180,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 
 		return new Kafka09Fetcher<>(
 				sourceContext,
-				thisSubtaskPartitions,
-				restoredSnapshotState,
+				assignedPartitionsWithInitialOffsets,
 				watermarksPeriodic,
 				watermarksPunctuated,
 				runtimeContext.getProcessingTimeService(),
@@ -194,7 +192,6 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 				deserializer,
 				properties,
 				pollTimeout,
-				startupMode,
 				useMetrics);
 		
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index b7c9bc2..c389486 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -22,7 +22,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
@@ -71,8 +70,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 
 	public Kafka09Fetcher(
 			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> assignedPartitions,
-			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			ProcessingTimeService processingTimeProvider,
@@ -84,19 +82,16 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long pollTimeout,
-			StartupMode startupMode,
 			boolean useMetrics) throws Exception
 	{
 		super(
 				sourceContext,
-				assignedPartitions,
-				restoredSnapshotState,
+				assignedPartitionsWithInitialOffsets,
 				watermarksPeriodic,
 				watermarksPunctuated,
 				processingTimeProvider,
 				autoWatermarkInterval,
 				userCodeClassLoader,
-				startupMode,
 				useMetrics);
 
 		this.deserializer = deserializer;
@@ -114,13 +109,11 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 				LOG,
 				handover,
 				kafkaProperties,
-				subscribedPartitions(),
+				subscribedPartitionStates(),
 				kafkaMetricGroup,
 				createCallBridge(),
 				getFetcherName() + " for " + taskNameWithSubtasks,
 				pollTimeout,
-				startupMode,
-				isRestored,
 				useMetrics);
 	}
 
@@ -142,7 +135,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 				final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
 
 				// get the records for each topic partition
-				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
+				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
 
 					List<ConsumerRecord<byte[], byte[]>> partitionRecords =
 							records.records(partition.getKafkaPartitionHandle());
@@ -226,7 +219,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 
 	@Override
 	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
-		KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions();
+		KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitionStates();
 		Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
 
 		for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
index a97b3cf..37ba34c 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
@@ -39,16 +39,12 @@ public class KafkaConsumerCallBridge {
 		consumer.assign(topicPartitions);
 	}
 
-	public void seekPartitionsToBeginning(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
-		for (TopicPartition partition : partitions) {
-			consumer.seekToBeginning(partition);
-		}
+	public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
+		consumer.seekToBeginning(partition);
 	}
 
-	public void seekPartitionsToEnd(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
-		for (TopicPartition partition : partitions) {
-			consumer.seekToEnd(partition);
-		}
+	public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
+		consumer.seekToEnd(partition);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index 03fe2c6..cbe1551 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.connectors.kafka.internal;
 
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -69,7 +69,7 @@ public class KafkaConsumerThread extends Thread {
 	private final Properties kafkaProperties;
 
 	/** The partitions that this consumer reads from */ 
-	private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions;
+	private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitionStates;
 
 	/** We get this from the outside to publish metrics. **/
 	private final MetricGroup kafkaMetricGroup;
@@ -80,12 +80,6 @@ public class KafkaConsumerThread extends Thread {
 	/** The maximum number of milliseconds to wait for a fetch batch */
 	private final long pollTimeout;
 
-	/** The configured startup mode (relevant only if we're restored from checkpoint / savepoint) */
-	private final StartupMode startupMode;
-
-	/** Flag whether or not we're restored from checkpoint / savepoint */
-	private final boolean isRestored;
-
 	/** Flag whether to add Kafka's metrics to the Flink metrics */
 	private final boolean useMetrics;
 
@@ -103,13 +97,11 @@ public class KafkaConsumerThread extends Thread {
 			Logger log,
 			Handover handover,
 			Properties kafkaProperties,
-			KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions,
+			KafkaTopicPartitionState<TopicPartition>[] subscribedPartitionStates,
 			MetricGroup kafkaMetricGroup,
 			KafkaConsumerCallBridge consumerCallBridge,
 			String threadName,
 			long pollTimeout,
-			StartupMode startupMode,
-			boolean isRestored,
 			boolean useMetrics) {
 
 		super(threadName);
@@ -120,21 +112,8 @@ public class KafkaConsumerThread extends Thread {
 		this.kafkaProperties = checkNotNull(kafkaProperties);
 		this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
 		this.consumerCallBridge = checkNotNull(consumerCallBridge);
-		this.startupMode = checkNotNull(startupMode);
-
-		this.subscribedPartitions = checkNotNull(subscribedPartitions);
-		this.isRestored = isRestored;
-
-		// if we are restoring from a checkpoint / savepoint, all
-		// subscribed partitions' state should have defined offsets
-		if (isRestored) {
-			for (KafkaTopicPartitionState<TopicPartition> subscribedPartition : subscribedPartitions) {
-				if (!subscribedPartition.isOffsetDefined()) {
-					throw new IllegalArgumentException("Restoring from a checkpoint / savepoint, but found a " +
-						"partition state " + subscribedPartition + " that does not have a defined offset.");
-				}
-			}
-		}
+
+		this.subscribedPartitionStates = checkNotNull(subscribedPartitionStates);
 
 		this.pollTimeout = pollTimeout;
 		this.useMetrics = useMetrics;
@@ -173,7 +152,7 @@ public class KafkaConsumerThread extends Thread {
 			final OffsetCommitCallback offsetCommitCallback = new CommitCallback();
 
 			// tell the consumer which partitions to work with
-			consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions));
+			consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitionStates));
 
 			// register Kafka's very own metrics in Flink's metric reporters
 			if (useMetrics) {
@@ -195,39 +174,23 @@ public class KafkaConsumerThread extends Thread {
 				return;
 			}
 
-			if (isRestored) {
-				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
-					log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
-						"to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
-
-					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
-				}
-			} else {
-				List<TopicPartition> partitionList = convertKafkaPartitions(subscribedPartitions);
-
-				// fetch offsets from Kafka, depending on the configured startup mode
-				switch (startupMode) {
-					case EARLIEST:
-						log.info("Setting starting point as earliest offset for partitions {}", partitionList);
-
-						consumerCallBridge.seekPartitionsToBeginning(consumer, partitionList);
-						break;
-					case LATEST:
-						log.info("Setting starting point as latest offset for partitions {}", partitionList);
-
-						consumerCallBridge.seekPartitionsToEnd(consumer, partitionList);
-						break;
-					default:
-					case GROUP_OFFSETS:
-						log.info("Using group offsets in Kafka of group.id {} as starting point for partitions {}",
-							kafkaProperties.getProperty("group.id"), partitionList);
-				}
+			// offsets in the state may still be placeholder sentinel values if we are starting fresh, or the
+			// checkpoint / savepoint state we were restored with had not completely been replaced with actual offset
+			// values yet; replace those with actual offsets, according to what the sentinel value represent.
+			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates) {
+				if (partition.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
+					consumerCallBridge.seekPartitionToBeginning(consumer, partition.getKafkaPartitionHandle());
+					partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
+				} else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
+					consumerCallBridge.seekPartitionToEnd(consumer, partition.getKafkaPartitionHandle());
+					partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
+				} else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+					// the KafkaConsumer by default will automatically seek the consumer position
+					// to the committed group offset, so we do not need to do it.
 
-				// on startup, all partition states will not have defined offsets;
-				// set the initial states with the offsets fetched from Kafka
-				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
-					// the fetched offset represents the next record to process, so we need to subtract it by 1
 					partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
+				} else {
+					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index abd75cc..49144e6 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -24,10 +24,10 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -118,13 +118,13 @@ public class Kafka09FetcherTest {
 
 		@SuppressWarnings("unchecked")
 		SourceContext<String> sourceContext = mock(SourceContext.class);
-		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+			Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
 		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
 		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
 				sourceContext,
-				topics,
-				null, /* no restored state */
+				partitionsWithInitialOffsets,
 				null, /* periodic watermark extractor */
 				null, /* punctuated watermark extractor */
 				new TestProcessingTimeService(),
@@ -136,7 +136,6 @@ public class Kafka09FetcherTest {
 				schema,
 				new Properties(),
 				0L,
-				StartupMode.GROUP_OFFSETS,
 				false);
 
 		// ----- run the fetcher -----
@@ -256,13 +255,13 @@ public class Kafka09FetcherTest {
 
 		@SuppressWarnings("unchecked")
 		SourceContext<String> sourceContext = mock(SourceContext.class);
-		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+			Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
 		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
 		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
 				sourceContext,
-				topics,
-				null, /* no restored state */
+				partitionsWithInitialOffsets,
 				null, /* periodic watermark extractor */
 				null, /* punctuated watermark extractor */
 				new TestProcessingTimeService(),
@@ -274,7 +273,6 @@ public class Kafka09FetcherTest {
 				schema,
 				new Properties(),
 				0L,
-				StartupMode.GROUP_OFFSETS,
 				false);
 
 		// ----- run the fetcher -----
@@ -372,13 +370,13 @@ public class Kafka09FetcherTest {
 		// ----- build a fetcher -----
 
 		BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
-		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+			Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
 		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
 		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
 				sourceContext,
-				topics,
-				null, /* no restored state */
+				partitionsWithInitialOffsets,
 				null, /* periodic watermark extractor */
 				null, /* punctuated watermark extractor */
 				new TestProcessingTimeService(),
@@ -390,7 +388,6 @@ public class Kafka09FetcherTest {
 				schema,
 				new Properties(),
 				0L,
-				StartupMode.GROUP_OFFSETS,
 				false);
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 1121d1b..144ede8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -40,15 +40,11 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -90,8 +86,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	/** The schema to convert between Kafka's byte messages, and Flink's objects */
 	protected final KeyedDeserializationSchema<T> deserializer;
 
-	/** The set of topic partitions that the source will read */
-	private List<KafkaTopicPartition> subscribedPartitions;
+	/** The set of topic partitions that the source will read, with their initial offsets to start reading from */
+	private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
 	
 	/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
 	 * to exploit per-partition timestamp characteristics.
@@ -138,17 +134,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		this.deserializer = checkNotNull(deserializer, "valueDeserializer");
 	}
 
-	/**
-	 * This method must be called from the subclasses, to set the list of all subscribed partitions
-	 * that this consumer will fetch from (across all subtasks).
-	 * 
-	 * @param allSubscribedPartitions The list of all partitions that all subtasks together should fetch from.
-	 */
-	protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) {
-		checkNotNull(allSubscribedPartitions);
-		this.subscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions);
-	}
-
 	// ------------------------------------------------------------------------
 	//  Configuration
 	// ------------------------------------------------------------------------
@@ -263,17 +248,67 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	// ------------------------------------------------------------------------
 
 	@Override
+	public void open(Configuration configuration) {
+		List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
+
+		subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size());
+
+		if (kafkaTopicPartitions != null) {
+			if (restoredState != null) {
+				for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
+					if (restoredState.containsKey(kafkaTopicPartition)) {
+						subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition));
+					}
+				}
+
+				LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
+					getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
+			} else {
+				initializeSubscribedPartitionsToStartOffsets(
+					subscribedPartitionsToStartOffsets,
+					kafkaTopicPartitions,
+					getRuntimeContext().getIndexOfThisSubtask(),
+					getRuntimeContext().getNumberOfParallelSubtasks(),
+					startupMode);
+
+				if (subscribedPartitionsToStartOffsets.size() != 0) {
+					switch (startupMode) {
+						case EARLIEST:
+							LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
+								getRuntimeContext().getIndexOfThisSubtask(),
+								subscribedPartitionsToStartOffsets.size(),
+								subscribedPartitionsToStartOffsets.keySet());
+							break;
+						case LATEST:
+							LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
+								getRuntimeContext().getIndexOfThisSubtask(),
+								subscribedPartitionsToStartOffsets.size(),
+								subscribedPartitionsToStartOffsets.keySet());
+							break;
+						default:
+						case GROUP_OFFSETS:
+							LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
+								getRuntimeContext().getIndexOfThisSubtask(),
+								subscribedPartitionsToStartOffsets.size(),
+								subscribedPartitionsToStartOffsets.keySet());
+					}
+				}
+			}
+		}
+	}
+
+	@Override
 	public void run(SourceContext<T> sourceContext) throws Exception {
-		if (subscribedPartitions == null) {
+		if (subscribedPartitionsToStartOffsets == null) {
 			throw new Exception("The partitions were not set for the consumer");
 		}
 
 		// we need only do work, if we actually have partitions assigned
-		if (!subscribedPartitions.isEmpty()) {
+		if (!subscribedPartitionsToStartOffsets.isEmpty()) {
 
 			// create the fetcher that will communicate with the Kafka brokers
 			final AbstractFetcher<T, ?> fetcher = createFetcher(
-					sourceContext, subscribedPartitions, restoredState,
+					sourceContext, subscribedPartitionsToStartOffsets,
 					periodicWatermarkAssigner, punctuatedWatermarkAssigner,
 					(StreamingRuntimeContext) getRuntimeContext());
 
@@ -327,15 +362,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	@Override
-	public void open(Configuration configuration) {
-		List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
-
-		if (kafkaTopicPartitions != null) {
-			assignTopicPartitions(kafkaTopicPartitions);
-		}
-	}
-
-	@Override
 	public void close() throws Exception {
 		// pretty much the same logic as cancelling
 		try {
@@ -386,18 +412,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			if (fetcher == null) {
 				// the fetcher has not yet been initialized, which means we need to return the
 				// originally restored offsets or the assigned partitions
-
-				if (restoredState != null) {
-
-					for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoredState.entrySet()) {
-						offsetsStateForCheckpoint.add(
-								Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
-					}
-				} else if (subscribedPartitions != null) {
-					for (KafkaTopicPartition subscribedPartition : subscribedPartitions) {
-						offsetsStateForCheckpoint.add(
-								Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET));
-					}
+				for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
+					offsetsStateForCheckpoint.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
 				}
 
 				// the map cannot be asynchronously updated, because only one checkpoint call can happen
@@ -493,7 +509,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * data, and emits it into the data streams.
 	 * 
 	 * @param sourceContext The source context to emit data to.
-	 * @param thisSubtaskPartitions The set of partitions that this subtask should handle.
+	 * @param subscribedPartitionsToStartOffsets The set of partitions that this subtask should handle, with their start offsets.
 	 * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator.
 	 * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator.
 	 * @param runtimeContext The task's runtime context.
@@ -504,8 +520,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 */
 	protected abstract AbstractFetcher<T, ?> createFetcher(
 			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> thisSubtaskPartitions,
-			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+			Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext) throws Exception;
@@ -525,60 +540,33 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) {
-		subscribedPartitions = new ArrayList<>();
-
-		if (restoredState != null) {
-			for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
-				if (restoredState.containsKey(kafkaTopicPartition)) {
-					subscribedPartitions.add(kafkaTopicPartition);
-				}
-			}
-		} else {
-			Collections.sort(kafkaTopicPartitions, new Comparator<KafkaTopicPartition>() {
-				@Override
-				public int compare(KafkaTopicPartition o1, KafkaTopicPartition o2) {
-					int topicComparison = o1.getTopic().compareTo(o2.getTopic());
-
-					if (topicComparison == 0) {
-						return o1.getPartition() - o2.getPartition();
-					} else {
-						return topicComparison;
-					}
-				}
-			});
-
-			for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) {
-				subscribedPartitions.add(kafkaTopicPartitions.get(i));
-			}
-		}
-	}
-
 	/**
-	 * Selects which of the given partitions should be handled by a specific consumer,
-	 * given a certain number of consumers.
-	 * 
-	 * @param allPartitions The partitions to select from
-	 * @param numConsumers The number of consumers
-	 * @param consumerIndex The index of the specific consumer
-	 * 
-	 * @return The sublist of partitions to be handled by that consumer.
+	 * Initializes {@link FlinkKafkaConsumerBase#subscribedPartitionsToStartOffsets} with appropriate
+	 * values. The method decides which partitions this consumer instance should subscribe to, and also
+	 * sets the initial offset each subscribed partition should be started from based on the configured startup mode.
+	 *
+	 * @param subscribedPartitionsToStartOffsets to subscribedPartitionsToStartOffsets to initialize
+	 * @param kafkaTopicPartitions the complete list of all Kafka partitions
+	 * @param indexOfThisSubtask the index of this consumer instance
+	 * @param numParallelSubtasks total number of parallel consumer instances
+	 * @param startupMode the configured startup mode for the consumer
+	 *
+	 * Note: This method is also exposed for testing.
 	 */
-	protected static List<KafkaTopicPartition> assignPartitions(
-			List<KafkaTopicPartition> allPartitions,
-			int numConsumers, int consumerIndex) {
-		final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
-				allPartitions.size() / numConsumers + 1);
-
-		for (int i = 0; i < allPartitions.size(); i++) {
-			if (i % numConsumers == consumerIndex) {
-				thisSubtaskPartitions.add(allPartitions.get(i));
+	protected static void initializeSubscribedPartitionsToStartOffsets(
+			Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
+			List<KafkaTopicPartition> kafkaTopicPartitions,
+			int indexOfThisSubtask,
+			int numParallelSubtasks,
+			StartupMode startupMode) {
+
+		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
+			if (i % numParallelSubtasks == indexOfThisSubtask) {
+				subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
 			}
 		}
-		
-		return thisSubtaskPartitions;
 	}
-	
+
 	/**
 	 * Logs the partition information in INFO level.
 	 * 
@@ -607,8 +595,17 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	@VisibleForTesting
-	List<KafkaTopicPartition> getSubscribedPartitions() {
-		return subscribedPartitions;
+	void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) {
+		checkNotNull(allSubscribedPartitions);
+		this.subscribedPartitionsToStartOffsets = new HashMap<>();
+		for (KafkaTopicPartition partition : allSubscribedPartitions) {
+			this.subscribedPartitionsToStartOffsets.put(partition, null);
+		}
+	}
+
+	@VisibleForTesting
+	Map<KafkaTopicPartition, Long> getSubscribedPartitionsToStartOffsets() {
+		return subscribedPartitionsToStartOffsets;
 	}
 
 	@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
index 331c1a6..f796e62 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
@@ -16,18 +16,30 @@
  */
 package org.apache.flink.streaming.connectors.kafka.config;
 
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+
 /**
  * Startup modes for the Kafka Consumer.
  */
 public enum StartupMode {
 
 	/** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default) */
-	GROUP_OFFSETS,
+	GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),
 
 	/** Start from the earliest offset possible */
-	EARLIEST,
+	EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
 
 	/** Start from the latest offset */
-	LATEST
-	
+	LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+	/** The sentinel offset value corresponding to this startup mode */
+	private long stateSentinel;
+
+	StartupMode(long stateSentinel) {
+		this.stateSentinel = stateSentinel;
+	}
+
+	public long getStateSentinel() {
+		return stateSentinel;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index b27e996..e021881 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -26,12 +26,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,20 +61,14 @@ public abstract class AbstractFetcher<T, KPH> {
 	protected final Object checkpointLock;
 
 	/** All partitions (and their state) that this fetcher is subscribed to */
-	private final KafkaTopicPartitionState<KPH>[] allPartitions;
+	private final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates;
 
 	/** The mode describing whether the fetcher also generates timestamps and watermarks */
 	protected final int timestampWatermarkMode;
 
-	/** The startup mode for the consumer (only relevant if the consumer wasn't restored) */
-	protected final StartupMode startupMode;
-
 	/** Flag whether to register metrics for the fetcher */
 	protected final boolean useMetrics;
 
-	/** Flag whether or not the consumer state was restored from a checkpoint / savepoint */
-	protected final boolean isRestored;
-
 	/** Only relevant for punctuated watermarks: The current cross partition watermark */
 	private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
 
@@ -84,19 +76,16 @@ public abstract class AbstractFetcher<T, KPH> {
 	
 	protected AbstractFetcher(
 			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> assignedPartitions,
-			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			ProcessingTimeService processingTimeProvider,
 			long autoWatermarkInterval,
 			ClassLoader userCodeClassLoader,
-			StartupMode startupMode,
 			boolean useMetrics) throws Exception
 	{
 		this.sourceContext = checkNotNull(sourceContext);
 		this.checkpointLock = sourceContext.getCheckpointLock();
-		this.startupMode = checkNotNull(startupMode);
 		this.useMetrics = useMetrics;
 		
 		// figure out what we watermark mode we will be using
@@ -115,30 +104,25 @@ public abstract class AbstractFetcher<T, KPH> {
 				throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
 			}
 		}
-		
+
 		// create our partition state according to the timestamp/watermark mode 
-		this.allPartitions = initializePartitions(
-				assignedPartitions,
+		this.subscribedPartitionStates = initializeSubscribedPartitionStates(
+				assignedPartitionsWithInitialOffsets,
 				timestampWatermarkMode,
 				watermarksPeriodic, watermarksPunctuated,
 				userCodeClassLoader);
 
-		if (restoredSnapshotState != null) {
-			for (KafkaTopicPartitionState<?> partition : allPartitions) {
-				Long offset = restoredSnapshotState.get(partition.getKafkaTopicPartition());
-				if (offset != null) {
-					partition.setOffset(offset);
-				}
+		// check that all partition states have a defined offset
+		for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
+			if (!partitionState.isOffsetDefined()) {
+				throw new IllegalArgumentException("The fetcher was assigned partitions with undefined initial offsets.");
 			}
-			this.isRestored = true;
-		} else {
-			this.isRestored = false;
 		}
 		
 		// if we have periodic watermarks, kick off the interval scheduler
 		if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
 			KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = 
-					(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
+					(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) subscribedPartitionStates;
 			
 			PeriodicWatermarkEmitter periodicEmitter = 
 					new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
@@ -155,8 +139,8 @@ public abstract class AbstractFetcher<T, KPH> {
 	 *
 	 * @return All subscribed partitions.
 	 */
-	protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
-		return allPartitions;
+	protected final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates() {
+		return subscribedPartitionStates;
 	}
 
 	// ------------------------------------------------------------------------
@@ -207,8 +191,8 @@ public abstract class AbstractFetcher<T, KPH> {
 		// this method assumes that the checkpoint lock is held
 		assert Thread.holdsLock(checkpointLock);
 
-		HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
-		for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
+		HashMap<KafkaTopicPartition, Long> state = new HashMap<>(subscribedPartitionStates.length);
+		for (KafkaTopicPartitionState<?> partition : subscribedPartitionStates()) {
 			state.put(partition.getKafkaTopicPartition(), partition.getOffset());
 		}
 		return state;
@@ -343,7 +327,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
 			long newMin = Long.MAX_VALUE;
 
-			for (KafkaTopicPartitionState<?> state : allPartitions) {
+			for (KafkaTopicPartitionState<?> state : subscribedPartitionStates) {
 				@SuppressWarnings("unchecked")
 				final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
 						(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
@@ -371,8 +355,8 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * Utility method that takes the topic partitions and creates the topic partition state
 	 * holders. If a watermark generator per partition exists, this will also initialize those.
 	 */
-	private KafkaTopicPartitionState<KPH>[] initializePartitions(
-			List<KafkaTopicPartition> assignedPartitions,
+	private KafkaTopicPartitionState<KPH>[] initializeSubscribedPartitionStates(
+			Map<KafkaTopicPartition, Long> assignedPartitionsToInitialOffsets,
 			int timestampWatermarkMode,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
@@ -384,13 +368,16 @@ public abstract class AbstractFetcher<T, KPH> {
 			case NO_TIMESTAMPS_WATERMARKS: {
 				@SuppressWarnings("unchecked")
 				KafkaTopicPartitionState<KPH>[] partitions =
-						(KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
+						(KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitionsToInitialOffsets.size()];
 
 				int pos = 0;
-				for (KafkaTopicPartition partition : assignedPartitions) {
+				for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
 					// create the kafka version specific partition handle
-					KPH kafkaHandle = createKafkaPartitionHandle(partition);
-					partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle);
+					KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey());
+					partitions[pos] = new KafkaTopicPartitionState<>(partition.getKey(), kafkaHandle);
+					partitions[pos].setOffset(partition.getValue());
+
+					pos++;
 				}
 
 				return partitions;
@@ -400,17 +387,20 @@ public abstract class AbstractFetcher<T, KPH> {
 				@SuppressWarnings("unchecked")
 				KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
 						(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
-								new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
+								new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()];
 
 				int pos = 0;
-				for (KafkaTopicPartition partition : assignedPartitions) {
-					KPH kafkaHandle = createKafkaPartitionHandle(partition);
+				for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
+					KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey());
 
 					AssignerWithPeriodicWatermarks<T> assignerInstance =
 							watermarksPeriodic.deserializeValue(userCodeClassLoader);
 					
-					partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
-							partition, kafkaHandle, assignerInstance);
+					partitions[pos] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+							partition.getKey(), kafkaHandle, assignerInstance);
+					partitions[pos].setOffset(partition.getValue());
+
+					pos++;
 				}
 
 				return partitions;
@@ -420,17 +410,20 @@ public abstract class AbstractFetcher<T, KPH> {
 				@SuppressWarnings("unchecked")
 				KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
 						(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
-								new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitions.size()];
+								new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()];
 
 				int pos = 0;
-				for (KafkaTopicPartition partition : assignedPartitions) {
-					KPH kafkaHandle = createKafkaPartitionHandle(partition);
+				for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
+					KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey());
 
 					AssignerWithPunctuatedWatermarks<T> assignerInstance =
 							watermarksPunctuated.deserializeValue(userCodeClassLoader);
 
-					partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
-							partition, kafkaHandle, assignerInstance);
+					partitions[pos] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+							partition.getKey(), kafkaHandle, assignerInstance);
+					partitions[pos].setOffset(partition.getValue());
+
+					pos++;
 				}
 
 				return partitions;
@@ -452,7 +445,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		// add current offsets to gage
 		MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
 		MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
-		for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
+		for (KafkaTopicPartitionState<?> ktp: subscribedPartitionStates()) {
 			currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
 			committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
index 7cb5f46..adfbf79 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -28,10 +28,6 @@ package org.apache.flink.streaming.connectors.kafka.internals;
  * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
  */
 public class KafkaTopicPartitionState<KPH> {
-
-	/** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
-	 * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
-	public static final long OFFSET_NOT_SET = -915623761776L;
 	
 	// ------------------------------------------------------------------------
 
@@ -52,8 +48,8 @@ public class KafkaTopicPartitionState<KPH> {
 	public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
 		this.partition = partition;
 		this.kafkaPartitionHandle = kafkaPartitionHandle;
-		this.offset = OFFSET_NOT_SET;
-		this.committedOffset = OFFSET_NOT_SET;
+		this.offset = KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
+		this.committedOffset = KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
 	}
 
 	// ------------------------------------------------------------------------
@@ -96,7 +92,7 @@ public class KafkaTopicPartitionState<KPH> {
 	}
 
 	public final boolean isOffsetDefined() {
-		return offset != OFFSET_NOT_SET;
+		return offset != KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
 	}
 
 	public final void setCommittedOffset(long offset) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
new file mode 100644
index 0000000..153a326
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * Magic values used to represent special offset states before partitions are actually read.
+ *
+ * The values are all negative. Negative offsets are not used by Kafka (invalid), so we
+ * pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else.
+ */
+public class KafkaTopicPartitionStateSentinel {
+
+	/** Magic number that defines an unset offset. */
+	public static final long OFFSET_NOT_SET = -915623761776L;
+
+	/**
+	 * Magic number that defines the partition should start from the earliest offset.
+	 *
+	 * This is used as a placeholder so that the actual earliest offset can be evaluated lazily
+	 * when the partition will actually start to be read by the consumer.
+	 */
+	public static final long EARLIEST_OFFSET = -915623761775L;
+
+	/**
+	 * Magic number that defines the partition should start from the latest offset.
+	 *
+	 * This is used as a placeholder so that the actual latest offset can be evaluated lazily
+	 * when the partition will actually start to be read by the consumer.
+	 */
+	public static final long LATEST_OFFSET = -915623761774L;
+
+	/**
+	 * Magic number that defines the partition should start from its committed group offset in Kafka.
+	 *
+	 * This is used as a placeholder so that the actual committed group offset can be evaluated lazily
+	 * when the partition will actually start to be read by the consumer.
+	 */
+	public static final long GROUP_OFFSET = -915623761773L;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 38a3ce8..20411e1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
@@ -34,6 +35,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
 
@@ -68,8 +70,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		testHarness.open();
 
 		// assert that no partitions were found and is empty
-		Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null);
-		Assert.assertTrue(consumerFunction.getSubscribedPartitions().isEmpty());
+		Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
+		Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
 
 		// assert that no state was restored
 		Assert.assertTrue(consumerFunction.getRestoredState() == null);
@@ -101,10 +103,16 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 			getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
 		testHarness.open();
 
+		// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot-empty-state";
+		// since the state is empty, the consumer should reflect on the startup mode to determine start offsets.
+		final HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<>();
+		expectedSubscribedPartitionsWithStartOffsets.put(new KafkaTopicPartition("abc", 13), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+		expectedSubscribedPartitionsWithStartOffsets.put(new KafkaTopicPartition("def", 7), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
 		// assert that there are partitions and is identical to expected list
-		Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null);
-		Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
-		Assert.assertTrue(consumerFunction.getSubscribedPartitions().equals(partitions));
+		Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
+		Assert.assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+		Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, consumerFunction.getSubscribedPartitionsToStartOffsets());
 
 		// assert that no state was restored
 		Assert.assertTrue(consumerFunction.getRestoredState() == null);
@@ -136,16 +144,18 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 			getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
 		testHarness.open();
 
-		// assert that there are partitions and is identical to expected list
-		Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null);
-		Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
-		Assert.assertEquals(partitions, consumerFunction.getSubscribedPartitions());
-
 		// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
 		final HashMap<KafkaTopicPartition, Long> expectedState = new HashMap<>();
 		expectedState.put(new KafkaTopicPartition("abc", 13), 16768L);
 		expectedState.put(new KafkaTopicPartition("def", 7), 987654321L);
 
+		// assert that there are partitions and is identical to expected list
+		Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
+		Assert.assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+
+		// on restore, subscribedPartitionsToStartOffsets should be identical to the restored state
+		Assert.assertEquals(expectedState, consumerFunction.getSubscribedPartitionsToStartOffsets());
+
 		// assert that state is correctly restored from legacy checkpoint
 		Assert.assertTrue(consumerFunction.getRestoredState() != null);
 		Assert.assertEquals(expectedState, consumerFunction.getRestoredState());
@@ -179,8 +189,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		@Override
 		protected AbstractFetcher<T, ?> createFetcher(
 				SourceContext<T> sourceContext,
-				List<KafkaTopicPartition> thisSubtaskPartitions,
-				HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+				Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
 				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 				StreamingRuntimeContext runtimeContext) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 980a025..e6ea63f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
@@ -42,13 +43,7 @@ import org.mockito.stubbing.Answer;
 
 import java.io.Serializable;
 import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -137,6 +132,8 @@ public class FlinkKafkaConsumerBaseTest {
 
 		consumer.initializeState(initializationContext);
 
+		consumer.open(new Configuration());
+
 		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
 
 		// ensure that the list was cleared and refilled. while this is an implementation detail, we use it here
@@ -177,6 +174,8 @@ public class FlinkKafkaConsumerBaseTest {
 
 		consumer.initializeState(initializationContext);
 
+		consumer.open(new Configuration());
+
 		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
 
 		assertFalse(listState.get().iterator().hasNext());
@@ -364,15 +363,10 @@ public class FlinkKafkaConsumerBaseTest {
 		@SuppressWarnings("unchecked")
 		protected AbstractFetcher<T, ?> createFetcher(
 				SourceContext<T> sourceContext,
-				List<KafkaTopicPartition> thisSubtaskPartitions,
-				HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
+				Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
 				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 				StreamingRuntimeContext runtimeContext) throws Exception {
-			if (restoredSnapshotState != null) {
-				Assert.fail("Trying to restore offsets even though there was no restore state.");
-				return null;
-			}
 			return mock(AbstractFetcher.class);
 		}