You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/15 15:44:14 UTC

[2/2] flink git commit: [FLINK-4280] [kafka] Explicit start position configuration for FlinkKafkaConsumer

[FLINK-4280] [kafka] Explicit start position configuration for FlinkKafkaConsumer

This closes #2509.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7477c5b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7477c5b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7477c5b5

Branch: refs/heads/master
Commit: 7477c5b57e3201a7c2f9d256e845c7328537ac7f
Parents: 6c310a7
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sat Sep 17 21:41:50 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Feb 15 23:42:44 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |  55 +++-
 .../connectors/kafka/FlinkKafkaConsumer010.java |   4 +
 .../kafka/internal/Kafka010Fetcher.java         |   6 +
 .../internal/KafkaConsumerCallBridge010.java    |  10 +
 .../connectors/kafka/Kafka010FetcherTest.java   |  93 +++---
 .../connectors/kafka/Kafka010ITCase.java        |  19 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  20 +-
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  34 ++-
 .../kafka/internals/Kafka08Fetcher.java         |  80 +++--
 .../kafka/internals/SimpleConsumerThread.java   | 126 +++++---
 .../connectors/kafka/Kafka08ITCase.java         |  28 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  18 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |   4 +
 .../kafka/internal/Kafka09Fetcher.java          |   8 +-
 .../kafka/internal/KafkaConsumerCallBridge.java |  15 +-
 .../kafka/internal/KafkaConsumerThread.java     |  71 +++--
 .../connectors/kafka/Kafka09FetcherTest.java    |   8 +-
 .../connectors/kafka/Kafka09ITCase.java         |  18 ++
 .../kafka/KafkaTestEnvironmentImpl.java         |  20 +-
 .../kafka/FlinkKafkaConsumerBase.java           |  90 ++++--
 .../connectors/kafka/config/StartupMode.java    |  33 ++
 .../kafka/internals/AbstractFetcher.java        |  36 ++-
 .../FlinkKafkaConsumerBaseMigrationTest.java    | 301 ++++---------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  23 +-
 .../connectors/kafka/KafkaConsumerTestBase.java | 267 ++++++++++++++--
 .../kafka/KafkaShortRetentionTestBase.java      |   1 -
 .../connectors/kafka/KafkaTestEnvironment.java  |   3 +-
 .../AbstractFetcherTimestampsTest.java          |  17 +-
 .../testutils/JobManagerCommunicationUtils.java |  27 +-
 29 files changed, 944 insertions(+), 491 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index a727f85..0f700ab 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -164,11 +164,64 @@ For convenience, Flink provides the following schemas:
     The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
     an optional "metadata" field that exposes the offset/partition/topic for this message.
 
+### Kafka Consumers Start Position Configuration
+
+The Flink Kafka Consumer allows configuring how the start position for Kafka
+partitions are determined.
+
+Example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
+myConsumer.setStartFromEarliest();     // start from the earliest record possible
+myConsumer.setStartFromLatest();       // start from the latest record
+myConsumer.setStartFromGroupOffsets(); // the default behaviour
+
+DataStream<String> stream = env.addSource(myConsumer);
+...
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val myConsumer = new FlinkKafkaConsumer08[String](...)
+myConsumer.setStartFromEarliest()      // start from the earliest record possible
+myConsumer.setStartFromLatest()        // start from the latest record
+myConsumer.setStartFromGroupOffsets()  // the default behaviour
+
+val stream = env.addSource(myConsumer)
+...
+{% endhighlight %}
+</div>
+</div>
+
+All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position.
+
+ * `setStartFromGroupOffsets` (default behaviour): Start reading partitions from
+ the consumer group's (`group.id` setting in the consumer properties) committed
+ offsets in Kafka brokers (or Zookeeper for Kafka 0.8). If offsets could not be
+ found for a partition, the `auto.offset.reset` setting in the properties will be used.
+ * `setStartFromEarliest()` / `setStartFromLatest()`: Start from the earliest / latest
+ record. Under these modes, committed offsets in Kafka will be ignored and
+ not used as starting positions.
+ 
+Note that these settings do not affect the start position when the job is
+automatically restored from a failure or manually restored using a savepoint.
+On restore, the start position of each Kafka partition is determined by the
+offsets stored in the savepoint or checkpoint
+(please see the next section for information about checkpointing to enable
+fault tolerance for the consumer).
+
 ### Kafka Consumers and Fault Tolerance
 
 With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all
 its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore
-the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where
+the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were
 stored in the checkpoint.
 
 The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index a9ce336..3a58216 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaW
 import org.apache.flink.util.SerializedValue;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 
@@ -128,6 +129,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 	protected AbstractFetcher<T, ?> createFetcher(
 			SourceContext<T> sourceContext,
 			List<KafkaTopicPartition> thisSubtaskPartitions,
+			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext) throws Exception {
@@ -137,6 +139,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 		return new Kafka010Fetcher<>(
 				sourceContext,
 				thisSubtaskPartitions,
+				restoredSnapshotState,
 				watermarksPeriodic,
 				watermarksPunctuated,
 				runtimeContext.getProcessingTimeService(),
@@ -148,6 +151,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 				deserializer,
 				properties,
 				pollTimeout,
+				startupMode,
 				useMetrics);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 71dd29a..efb6f88 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -31,6 +32,7 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 
@@ -47,6 +49,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 	public Kafka010Fetcher(
 			SourceContext<T> sourceContext,
 			List<KafkaTopicPartition> assignedPartitions,
+			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			ProcessingTimeService processingTimeProvider,
@@ -58,11 +61,13 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long pollTimeout,
+			StartupMode startupMode,
 			boolean useMetrics) throws Exception
 	{
 		super(
 				sourceContext,
 				assignedPartitions,
+				restoredSnapshotState,
 				watermarksPeriodic,
 				watermarksPunctuated,
 				processingTimeProvider,
@@ -74,6 +79,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 				deserializer,
 				kafkaProperties,
 				pollTimeout,
+				startupMode,
 				useMetrics);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
index a81b098..1e0bc5b 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
@@ -37,4 +37,14 @@ public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
 	public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
 		consumer.assign(topicPartitions);
 	}
+
+	@Override
+	public void seekPartitionsToBeginning(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
+		consumer.seekToBeginning(partitions);
+	}
+
+	@Override
+	public void seekPartitionsToEnd(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
+		consumer.seekToEnd(partitions);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 6ee0429..3bc154e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -22,9 +22,9 @@ import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internal.Handover;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -122,20 +122,22 @@ public class Kafka010FetcherTest {
         KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
         final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-                sourceContext,
-                topics,
-                null, /* periodic assigner */
-                null, /* punctuated assigner */
-                new TestProcessingTimeService(),
-                10,
-                getClass().getClassLoader(),
-                false, /* checkpointing */
-                "taskname-with-subtask",
-                new UnregisteredMetricsGroup(),
-                schema,
-                new Properties(),
-                0L,
-                false);
+        		sourceContext,
+				topics,
+				null, /* no restored state */
+				null, /* periodic assigner */
+				null, /* punctuated assigner */
+				new TestProcessingTimeService(),
+				10,
+				getClass().getClassLoader(),
+				false, /* checkpointing */
+				"taskname-with-subtask",
+				new UnregisteredMetricsGroup(),
+				schema,
+				new Properties(),
+				0L,
+				StartupMode.GROUP_OFFSETS,
+				false);
 
         // ----- run the fetcher -----
 
@@ -256,23 +258,24 @@ public class Kafka010FetcherTest {
         SourceContext<String> sourceContext = mock(SourceContext.class);
         List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
         KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-        StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
 
         final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-                sourceContext,
-                topics,
-                null, /* periodic assigner */
-                null, /* punctuated assigner */
-                new TestProcessingTimeService(),
-                10,
-                getClass().getClassLoader(),
-                false, /* checkpointing */
-                "taskname-with-subtask",
-                new UnregisteredMetricsGroup(),
-                schema,
-                new Properties(),
-                0L,
-                false);
+        		sourceContext,
+				topics,
+				null, /* no restored state */
+				null, /* periodic assigner */
+				null, /* punctuated assigner */
+				new TestProcessingTimeService(),
+				10,
+				getClass().getClassLoader(),
+				false, /* checkpointing */
+				"taskname-with-subtask",
+				new UnregisteredMetricsGroup(),
+				schema,
+				new Properties(),
+				0L,
+				StartupMode.GROUP_OFFSETS,
+				false);
 
 
         // ----- run the fetcher -----
@@ -374,20 +377,22 @@ public class Kafka010FetcherTest {
         KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
         final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-                sourceContext,
-                topics,
-                null, /* periodic watermark extractor */
-                null, /* punctuated watermark extractor */
-                new TestProcessingTimeService(),
-                10, /* watermark interval */
-                this.getClass().getClassLoader(),
-                true, /* checkpointing */
-                "task_name",
-                new UnregisteredMetricsGroup(),
-                schema,
-                new Properties(),
-                0L,
-                false);
+				sourceContext,
+				topics,
+				null, /* no restored state */
+				null, /* periodic watermark extractor */
+				null, /* punctuated watermark extractor */
+				new TestProcessingTimeService(),
+				10, /* watermark interval */
+				this.getClass().getClassLoader(),
+				true, /* checkpointing */
+				"task_name",
+				new UnregisteredMetricsGroup(),
+				schema,
+				new Properties(),
+				0L,
+				StartupMode.GROUP_OFFSETS,
+				false);
 
 
         // ----- run the fetcher -----

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 08511c9..a375fb6 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -51,7 +51,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 	//  Suite of Tests
 	// ------------------------------------------------------------------------
 
-
 	@Test(timeout = 60000)
 	public void testFailOnNoBroker() throws Exception {
 		runFailOnNoBrokerTest();
@@ -131,6 +130,24 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 		runEndOfStreamTest();
 	}
 
+	// --- startup mode ---
+
+	@Test(timeout = 60000)
+	public void testStartFromEarliestOffsets() throws Exception {
+		runStartFromEarliestOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromLatestOffsets() throws Exception {
+		runStartFromLatestOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromGroupOffsets() throws Exception {
+		runStartFromGroupOffsets();
+	}
+
+
 	// --- offset committing ---
 
 	@Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index f15fd45..bc1faaf 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -47,6 +47,8 @@ import java.io.File;
 import java.net.BindException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -129,8 +131,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public KafkaOffsetHandler createOffsetHandler(Properties props) {
-		return new KafkaOffsetHandlerImpl(props);
+	public KafkaOffsetHandler createOffsetHandler() {
+		return new KafkaOffsetHandlerImpl();
 	}
 
 	@Override
@@ -401,7 +403,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 		private final KafkaConsumer<byte[], byte[]> offsetClient;
 
-		public KafkaOffsetHandlerImpl(Properties props) {
+		public KafkaOffsetHandlerImpl() {
+			Properties props = new Properties();
+			props.putAll(standardProps);
+			props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+			props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
 			offsetClient = new KafkaConsumer<>(props);
 		}
 
@@ -412,6 +419,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		}
 
 		@Override
+		public void setCommittedOffset(String topicName, int partition, long offset) {
+			Map<TopicPartition, OffsetAndMetadata> partitionAndOffset = new HashMap<>();
+			partitionAndOffset.put(new TopicPartition(topicName, partition), new OffsetAndMetadata(offset));
+			offsetClient.commitSync(partitionAndOffset);
+		}
+
+		@Override
 		public void close() {
 			offsetClient.close();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 0f11c72..c0e4dd7 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
 import kafka.common.ErrorMapping;
 import kafka.javaapi.PartitionMetadata;
@@ -49,6 +48,7 @@ import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.HashMap;
 import java.util.Properties;
 import java.util.Random;
 
@@ -112,9 +112,6 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	/** The properties to parametrize the Kafka consumer and ZooKeeper client */ 
 	private final Properties kafkaProperties;
 
-	/** The behavior when encountering an invalid offset (see {@link OffsetRequest}) */
-	private final long invalidOffsetBehavior;
-
 	/** The interval in which to automatically commit (-1 if deactivated) */
 	private final long autoCommitInterval;
 
@@ -188,7 +185,9 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 		// validate the zookeeper properties
 		validateZooKeeperConfig(props);
 
-		this.invalidOffsetBehavior = getInvalidOffsetBehavior(props);
+		// eagerly check for invalid "auto.offset.reset" values before launching the job
+		validateAutoOffsetResetValue(props);
+
 		this.autoCommitInterval = PropertiesUtil.getLong(props, "auto.commit.interval.ms", 60000);
 	}
 
@@ -196,16 +195,18 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	protected AbstractFetcher<T, ?> createFetcher(
 			SourceContext<T> sourceContext,
 			List<KafkaTopicPartition> thisSubtaskPartitions,
+			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext) throws Exception {
 
 		boolean useMetrics = !Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false"));
 
-		return new Kafka08Fetcher<>(sourceContext, thisSubtaskPartitions,
+		return new Kafka08Fetcher<>(sourceContext,
+				thisSubtaskPartitions, restoredSnapshotState,
 				watermarksPeriodic, watermarksPunctuated,
 				runtimeContext, deserializer, kafkaProperties,
-				invalidOffsetBehavior, autoCommitInterval, useMetrics);
+				autoCommitInterval, startupMode, useMetrics);
 	}
 
 	@Override
@@ -384,16 +385,19 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 		}
 	}
 
-	private static long getInvalidOffsetBehavior(Properties config) {
+	/**
+	 * Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting
+	 * the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception
+	 * right after a task is started.
+	 *
+	 * @param config kafka consumer properties to check
+	 */
+	private static void validateAutoOffsetResetValue(Properties config) {
 		final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
-		if (val.equals("none")) {
+		if (!(val.equals("largest") || val.equals("latest") || val.equals("earliest") || val.equals("smallest"))) {
+			// largest/smallest is kafka 0.8, latest/earliest is kafka 0.9
 			throw new IllegalArgumentException("Cannot use '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
-					+ "' value 'none'. Possible values: 'latest', 'largest', or 'earliest'.");
-		}
-		else if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9
-			return OffsetRequest.LatestTime();
-		} else {
-			return OffsetRequest.EarliestTime();
+				+ "' value '" + val + "'. Possible values: 'latest', 'largest', 'earliest', or 'smallest'.");
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 5a0aed3..ad520d8 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
+import kafka.api.OffsetRequest;
 import kafka.common.TopicAndPartition;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Node;
 
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
@@ -89,29 +92,32 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	public Kafka08Fetcher(
 			SourceContext<T> sourceContext,
 			List<KafkaTopicPartition> assignedPartitions,
+			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext,
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
-			long invalidOffsetBehavior,
 			long autoCommitInterval,
+			StartupMode startupMode,
 			boolean useMetrics) throws Exception
 	{
 		super(
 				sourceContext,
 				assignedPartitions,
+				restoredSnapshotState,
 				watermarksPeriodic,
 				watermarksPunctuated,
 				runtimeContext.getProcessingTimeService(),
 				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
 				runtimeContext.getUserCodeClassLoader(),
+				startupMode,
 				useMetrics);
 
 		this.deserializer = checkNotNull(deserializer);
 		this.kafkaConfig = checkNotNull(kafkaProperties);
 		this.runtimeContext = runtimeContext;
-		this.invalidOffsetBehavior = invalidOffsetBehavior;
+		this.invalidOffsetBehavior = getInvalidOffsetBehavior(kafkaProperties);
 		this.autoCommitInterval = autoCommitInterval;
 		this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
 
@@ -139,23 +145,44 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 
 		PeriodicOffsetCommitter periodicCommitter = null;
 		try {
-			// read offsets from ZooKeeper for partitions that did not restore offsets
-			{
-				List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
-				for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
-					if (!partition.isOffsetDefined()) {
-						partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
-					}
-				}
 
-				Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
-				for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
-					Long zkOffset = zkOffsets.get(partition.getKafkaTopicPartition());
-					if (zkOffset != null) {
-						// the offset in ZK represents the "next record to process", so we need to subtract it by 1
-						// to correctly represent our internally checkpointed offsets
-						partition.setOffset(zkOffset - 1);
-					}
+			// if we're not restored from a checkpoint, all partitions will not have their offset set;
+			// depending on the configured startup mode, accordingly set the starting offsets
+			if (!isRestored) {
+				switch (startupMode) {
+					case EARLIEST:
+						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
+							partition.setOffset(OffsetRequest.EarliestTime());
+						}
+						break;
+					case LATEST:
+						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
+							partition.setOffset(OffsetRequest.LatestTime());
+						}
+						break;
+					default:
+					case GROUP_OFFSETS:
+						List<KafkaTopicPartition> partitions = new ArrayList<>(subscribedPartitions().length);
+						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
+							partitions.add(partition.getKafkaTopicPartition());
+						}
+
+						Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitions);
+						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
+							Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
+							if (offset != null) {
+								// the committed offset in ZK represents the next record to process,
+								// so we subtract it by 1 to correctly represent internal state
+								partition.setOffset(offset - 1);
+							} else {
+								// if we can't find an offset for a partition in ZK when using GROUP_OFFSETS,
+								// we default to "auto.offset.reset" like the Kafka high-level consumer
+								LOG.warn("No group offset can be found for partition {} in Zookeeper;" +
+									" resetting starting offset to 'auto.offset.reset'", partition);
+
+								partition.setOffset(invalidOffsetBehavior);
+							}
+						}
 				}
 			}
 
@@ -487,4 +514,21 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 
 		return leaderToPartitions;
 	}
+
+	/**
+	 * Retrieve the behaviour of "auto.offset.reset" from the config properties.
+	 * A partition needs to fallback to "auto.offset.reset" as default offset when
+	 * we can't find offsets in ZK to start from in {@link StartupMode#GROUP_OFFSETS} startup mode.
+	 *
+	 * @param config kafka consumer properties
+	 * @return either OffsetRequest.LatestTime() or OffsetRequest.EarliestTime()
+	 */
+	private static long getInvalidOffsetBehavior(Properties config) {
+		final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
+		if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9
+			return OffsetRequest.LatestTime();
+		} else {
+			return OffsetRequest.EarliestTime();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
index 35e491a..e9cfdac 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka.internals;
 
 import kafka.api.FetchRequestBuilder;
+import kafka.api.OffsetRequest;
 import kafka.api.PartitionOffsetRequestInfo;
 import kafka.common.ErrorMapping;
 import kafka.common.TopicAndPartition;
@@ -110,6 +111,8 @@ class SimpleConsumerThread<T> extends Thread {
 		this.owner = owner;
 		this.errorHandler = errorHandler;
 		this.broker = broker;
+		// all partitions should have been assigned a starting offset by the fetcher
+		checkAllPartitionsHaveDefinedStartingOffsets(seedPartitions);
 		this.partitions = seedPartitions;
 		this.deserializer = requireNonNull(deserializer);
 		this.unassignedPartitions = requireNonNull(unassignedPartitions);
@@ -144,10 +147,10 @@ class SimpleConsumerThread<T> extends Thread {
 			// create the Kafka consumer that we actually use for fetching
 			consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
 			
-			// make sure that all partitions have some offsets to start with
-			// those partitions that do not have an offset from a checkpoint need to get
-			// their start offset from ZooKeeper
-			getMissingOffsetsFromKafka(partitions);
+			// replace earliest of latest starting offsets with actual offset values fetched from Kafka
+			requestAndSetEarliestOrLatestOffsetsFromKafka(consumer, partitions);
+
+			LOG.info("Starting to consume {} partitions with consumer thread {}", partitions.size(), getName());
 
 			// Now, the actual work starts :-)
 			int offsetOutOfRangeCount = 0;
@@ -160,9 +163,12 @@ class SimpleConsumerThread<T> extends Thread {
 				List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = newPartitionsQueue.pollBatch();
 				if (newPartitions != null) {
 					// found some new partitions for this thread's broker
-					
-					// check if the new partitions need an offset lookup
-					getMissingOffsetsFromKafka(newPartitions);
+
+					// the new partitions should already be assigned a starting offset
+					checkAllPartitionsHaveDefinedStartingOffsets(newPartitions);
+					// if the new partitions are to start from earliest or latest offsets,
+					// we need to replace them with actual values from Kafka
+					requestAndSetEarliestOrLatestOffsetsFromKafka(consumer, newPartitions);
 					
 					// add the new partitions (and check they are not already in there)
 					for (KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) {
@@ -300,7 +306,7 @@ class SimpleConsumerThread<T> extends Thread {
 						}
 						// get valid offsets for these partitions and try again.
 						LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
-						getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
+						requestAndSetSpecificTimeOffsetsFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
 						
 						LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
 						continue; // jump back to create a new fetch request. The offset has not been touched.
@@ -408,26 +414,6 @@ class SimpleConsumerThread<T> extends Thread {
 		}
 	}
 
-	private void getMissingOffsetsFromKafka(
-			List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException
-	{
-		// collect which partitions we should fetch offsets for
-		List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
-		for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
-			if (!part.isOffsetDefined()) {
-				// retrieve the offset from the consumer
-				partitionsToGetOffsetsFor.add(part);
-			}
-		}
-		
-		if (partitionsToGetOffsetsFor.size() > 0) {
-			getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
-			
-			LOG.info("No checkpoint/savepoint offsets found for some partitions. " +
-					"Fetched the following start offsets {}", partitionsToGetOffsetsFor);
-		}
-	}
-
 	/**
 	 * Cancels this fetch thread. The thread will release all resources and terminate.
 	 */
@@ -447,15 +433,13 @@ class SimpleConsumerThread<T> extends Thread {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Request latest offsets for a set of partitions, via a Kafka consumer.
-	 *
-	 * <p>This method retries three times if the response has an error.
+	 * Request offsets before a specific time for a set of partitions, via a Kafka consumer.
 	 *
 	 * @param consumer The consumer connected to lead broker
 	 * @param partitions The list of partitions we need offsets for
 	 * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
 	 */
-	private static void getLastOffsetFromKafka(
+	private static void requestAndSetSpecificTimeOffsetsFromKafka(
 			SimpleConsumer consumer,
 			List<KafkaTopicPartitionState<TopicAndPartition>> partitions,
 			long whichTime) throws IOException
@@ -465,26 +449,65 @@ class SimpleConsumerThread<T> extends Thread {
 			requestInfo.put(part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(whichTime, 1));
 		}
 
+		requestAndSetOffsetsFromKafka(consumer, partitions, requestInfo);
+	}
+
+	/**
+	 * For a set of partitions, if a partition is set with the special offsets {@link OffsetRequest#EarliestTime()}
+	 * or {@link OffsetRequest#LatestTime()}, replace them with actual offsets requested via a Kafka consumer.
+	 *
+	 * @param consumer The consumer connected to lead broker
+	 * @param partitions The list of partitions we need offsets for
+	 */
+	private static void requestAndSetEarliestOrLatestOffsetsFromKafka(
+			SimpleConsumer consumer,
+			List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws Exception
+	{
+		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
+		for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+			if (part.getOffset() == OffsetRequest.EarliestTime() || part.getOffset() == OffsetRequest.LatestTime()) {
+				requestInfo.put(part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(part.getOffset(), 1));
+			}
+		}
+
+		requestAndSetOffsetsFromKafka(consumer, partitions, requestInfo);
+	}
+
+	/**
+	 * Request offsets from Kafka with a specified set of partition's offset request information.
+	 * The returned offsets are used to set the internal partition states.
+	 *
+	 * <p>This method retries three times if the response has an error.
+	 *
+	 * @param consumer The consumer connected to lead broker
+	 * @param partitionStates the partition states, will be set with offsets fetched from Kafka request
+	 * @param partitionToRequestInfo map of each partition to its offset request info
+	 */
+	private static void requestAndSetOffsetsFromKafka(
+			SimpleConsumer consumer,
+			List<KafkaTopicPartitionState<TopicAndPartition>> partitionStates,
+			Map<TopicAndPartition, PartitionOffsetRequestInfo> partitionToRequestInfo) throws IOException
+	{
 		int retries = 0;
 		OffsetResponse response;
 		while (true) {
 			kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
-					requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+				partitionToRequestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
 			response = consumer.getOffsetsBefore(request);
 
 			if (response.hasError()) {
 				StringBuilder exception = new StringBuilder();
-				for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+				for (KafkaTopicPartitionState<TopicAndPartition> part : partitionStates) {
 					short code;
 					if ((code = response.errorCode(part.getTopic(), part.getPartition())) != ErrorMapping.NoError()) {
 						exception.append("\nException for topic=").append(part.getTopic())
-								.append(" partition=").append(part.getPartition()).append(": ")
-								.append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code)));
+							.append(" partition=").append(part.getPartition()).append(": ")
+							.append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code)));
 					}
 				}
 				if (++retries >= 3) {
-					throw new IOException("Unable to get last offset for partitions " + partitions + ": "
-							+ exception.toString());
+					throw new IOException("Unable to get last offset for partitions " + partitionStates + ": "
+						+ exception.toString());
 				} else {
 					LOG.warn("Unable to get last offset for partitions: Exception(s): {}", exception);
 				}
@@ -493,12 +516,25 @@ class SimpleConsumerThread<T> extends Thread {
 			}
 		}
 
-		for (KafkaTopicPartitionState<TopicAndPartition> part: partitions) {
-			final long offset = response.offsets(part.getTopic(), part.getPartition())[0];
-			
-			// the offset returned is that of the next record to fetch. because our state reflects the latest
-			// successfully emitted record, we subtract one
-			part.setOffset(offset - 1);
+		for (KafkaTopicPartitionState<TopicAndPartition> part: partitionStates) {
+			// there will be offsets only for partitions that were requested for
+			if (partitionToRequestInfo.containsKey(part.getKafkaPartitionHandle())) {
+				final long offset = response.offsets(part.getTopic(), part.getPartition())[0];
+
+				// the offset returned is that of the next record to fetch. because our state reflects the latest
+				// successfully emitted record, we subtract one
+				part.setOffset(offset - 1);
+			}
+		}
+	}
+
+	private static void checkAllPartitionsHaveDefinedStartingOffsets(
+		List<KafkaTopicPartitionState<TopicAndPartition>> partitions)
+	{
+		for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+			if (!part.isOffsetDefined()) {
+				throw new IllegalArgumentException("SimpleConsumerThread received a partition with undefined starting offset");
+			}
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 0cdf465..334bd2b 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 
 import org.junit.Test;
@@ -81,9 +82,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
 	@Test(timeout = 60000)
 	public void testInvalidOffset() throws Exception {
-		
+
 		final int parallelism = 1;
-		
+
 		// write 20 messages into topic:
 		final String topic = writeSequence("invalidOffsetTopic", 20, parallelism, 1);
 
@@ -98,8 +99,8 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env.getConfig().disableSysoutLogging();
-		
-		readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom);
+
+		readSequence(env, StartupMode.GROUP_OFFSETS, standardProps, parallelism, topic, valuesCount, startFrom);
 
 		deleteTestTopic(topic);
 	}
@@ -128,6 +129,23 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		runBrokerFailureTest();
 	}
 
+	// --- startup mode ---
+
+	@Test(timeout = 60000)
+	public void testStartFromEarliestOffsets() throws Exception {
+		runStartFromEarliestOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromLatestOffsets() throws Exception {
+		runStartFromLatestOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromGroupOffsets() throws Exception {
+		runStartFromGroupOffsets();
+	}
+
 	// --- offset committing ---
 
 	@Test(timeout = 60000)
@@ -192,7 +210,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		readProps.setProperty("auto.commit.interval.ms", "500");
 
 		// read so that the offset can be committed to ZK
-		readSequence(env, readProps, parallelism, topicName, 100, 0);
+		readSequence(env, StartupMode.GROUP_OFFSETS, readProps, parallelism, topicName, 100, 0);
 
 		// get the offset
 		CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 6235449..6c2672a 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -33,7 +33,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
@@ -128,8 +127,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public KafkaOffsetHandler createOffsetHandler(Properties props) {
-		return new KafkaOffsetHandlerImpl(props);
+	public KafkaOffsetHandler createOffsetHandler() {
+		return new KafkaOffsetHandlerImpl();
 	}
 
 	@Override
@@ -378,9 +377,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		private final CuratorFramework offsetClient;
 		private final String groupId;
 
-		public KafkaOffsetHandlerImpl(Properties props) {
+		public KafkaOffsetHandlerImpl() {
 			offsetClient = createCuratorClient();
-			groupId = props.getProperty("group.id");
+			groupId = standardProps.getProperty("group.id");
 		}
 
 		@Override
@@ -393,6 +392,15 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		}
 
 		@Override
+		public void setCommittedOffset(String topicName, int partition, long offset) {
+			try {
+				ZookeeperOffsetHandler.setOffsetInZooKeeper(offsetClient, groupId, topicName, partition, offset);
+			} catch (Exception e) {
+				throw new RuntimeException("Exception when writing offsets to Zookeeper", e);
+			}
+		}
+
+		@Override
 		public void close() {
 			offsetClient.close();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 2b816c4..9a61b91 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.HashMap;
 import java.util.Properties;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -171,6 +172,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 	protected AbstractFetcher<T, ?> createFetcher(
 			SourceContext<T> sourceContext,
 			List<KafkaTopicPartition> thisSubtaskPartitions,
+			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext) throws Exception {
@@ -180,6 +182,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 		return new Kafka09Fetcher<>(
 				sourceContext,
 				thisSubtaskPartitions,
+				restoredSnapshotState,
 				watermarksPeriodic,
 				watermarksPunctuated,
 				runtimeContext.getProcessingTimeService(),
@@ -191,6 +194,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 				deserializer,
 				properties,
 				pollTimeout,
+				startupMode,
 				useMetrics);
 		
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index d495327..b7c9bc2 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
@@ -71,6 +72,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 	public Kafka09Fetcher(
 			SourceContext<T> sourceContext,
 			List<KafkaTopicPartition> assignedPartitions,
+			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			ProcessingTimeService processingTimeProvider,
@@ -82,16 +84,19 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long pollTimeout,
+			StartupMode startupMode,
 			boolean useMetrics) throws Exception
 	{
 		super(
 				sourceContext,
 				assignedPartitions,
+				restoredSnapshotState,
 				watermarksPeriodic,
 				watermarksPunctuated,
 				processingTimeProvider,
 				autoWatermarkInterval,
 				userCodeClassLoader,
+				startupMode,
 				useMetrics);
 
 		this.deserializer = deserializer;
@@ -114,6 +119,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 				createCallBridge(),
 				getFetcherName() + " for " + taskNameWithSubtasks,
 				pollTimeout,
+				startupMode,
+				isRestored,
 				useMetrics);
 	}
 
@@ -141,7 +148,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 							records.records(partition.getKafkaPartitionHandle());
 
 					for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
-
 						final T value = deserializer.deserialize(
 								record.key(), record.value(),
 								record.topic(), record.partition(), record.offset());

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
index c17aae6..a97b3cf 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
@@ -29,7 +29,7 @@ import java.util.List;
  * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
  * for example changing {@code assign(List)} to {@code assign(Collection)}.
  * 
- * Because of that, we need to two versions whose compiled code goes against different method signatures.
+ * Because of that, we need to have two versions whose compiled code goes against different method signatures.
  * Even though the source of subclasses may look identical, the byte code will be different, because they
  * are compiled against different dependencies.
  */
@@ -38,4 +38,17 @@ public class KafkaConsumerCallBridge {
 	public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
 		consumer.assign(topicPartitions);
 	}
+
+	public void seekPartitionsToBeginning(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
+		for (TopicPartition partition : partitions) {
+			consumer.seekToBeginning(partition);
+		}
+	}
+
+	public void seekPartitionsToEnd(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) {
+		for (TopicPartition partition : partitions) {
+			consumer.seekToEnd(partition);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index 9cfa840..03fe2c6 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka.internal;
 
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -79,6 +80,12 @@ public class KafkaConsumerThread extends Thread {
 	/** The maximum number of milliseconds to wait for a fetch batch */
 	private final long pollTimeout;
 
+	/** The configured startup mode (relevant only if we're restored from checkpoint / savepoint) */
+	private final StartupMode startupMode;
+
+	/** Flag whether or not we're restored from checkpoint / savepoint */
+	private final boolean isRestored;
+
 	/** Flag whether to add Kafka's metrics to the Flink metrics */
 	private final boolean useMetrics;
 
@@ -101,6 +108,8 @@ public class KafkaConsumerThread extends Thread {
 			KafkaConsumerCallBridge consumerCallBridge,
 			String threadName,
 			long pollTimeout,
+			StartupMode startupMode,
+			boolean isRestored,
 			boolean useMetrics) {
 
 		super(threadName);
@@ -109,9 +118,24 @@ public class KafkaConsumerThread extends Thread {
 		this.log = checkNotNull(log);
 		this.handover = checkNotNull(handover);
 		this.kafkaProperties = checkNotNull(kafkaProperties);
-		this.subscribedPartitions = checkNotNull(subscribedPartitions);
 		this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
 		this.consumerCallBridge = checkNotNull(consumerCallBridge);
+		this.startupMode = checkNotNull(startupMode);
+
+		this.subscribedPartitions = checkNotNull(subscribedPartitions);
+		this.isRestored = isRestored;
+
+		// if we are restoring from a checkpoint / savepoint, all
+		// subscribed partitions' state should have defined offsets
+		if (isRestored) {
+			for (KafkaTopicPartitionState<TopicPartition> subscribedPartition : subscribedPartitions) {
+				if (!subscribedPartition.isOffsetDefined()) {
+					throw new IllegalArgumentException("Restoring from a checkpoint / savepoint, but found a " +
+						"partition state " + subscribedPartition + " that does not have a defined offset.");
+				}
+			}
+		}
+
 		this.pollTimeout = pollTimeout;
 		this.useMetrics = useMetrics;
 
@@ -171,28 +195,39 @@ public class KafkaConsumerThread extends Thread {
 				return;
 			}
 
-			// seek the consumer to the initial offsets
-			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
-				if (partition.isOffsetDefined()) {
-					log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; " +
-							"seeking the consumer to position {}",
-							partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
+			if (isRestored) {
+				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
+					log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
+						"to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
 
 					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
 				}
-				else {
-					// for partitions that do not have offsets restored from a checkpoint/savepoint,
-					// we need to define our internal offset state for them using the initial offsets retrieved from Kafka
-					// by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
-
-					long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
-
-					log.info("Partition {} has no initial offset; the consumer has position {}, " +
-							"so the initial offset will be set to {}",
-							partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
+			} else {
+				List<TopicPartition> partitionList = convertKafkaPartitions(subscribedPartitions);
+
+				// fetch offsets from Kafka, depending on the configured startup mode
+				switch (startupMode) {
+					case EARLIEST:
+						log.info("Setting starting point as earliest offset for partitions {}", partitionList);
+
+						consumerCallBridge.seekPartitionsToBeginning(consumer, partitionList);
+						break;
+					case LATEST:
+						log.info("Setting starting point as latest offset for partitions {}", partitionList);
+
+						consumerCallBridge.seekPartitionsToEnd(consumer, partitionList);
+						break;
+					default:
+					case GROUP_OFFSETS:
+						log.info("Using group offsets in Kafka of group.id {} as starting point for partitions {}",
+							kafkaProperties.getProperty("group.id"), partitionList);
+				}
 
+				// on startup, all partition states will not have defined offsets;
+				// set the initial states with the offsets fetched from Kafka
+				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
 					// the fetched offset represents the next record to process, so we need to subtract it by 1
-					partition.setOffset(fetchedOffset - 1);
+					partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 7a82365..4526aa0 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internal.Handover;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -123,6 +124,7 @@ public class Kafka09FetcherTest {
 		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
 				sourceContext,
 				topics,
+				null, /* no restored state */
 				null, /* periodic watermark extractor */
 				null, /* punctuated watermark extractor */
 				new TestProcessingTimeService(),
@@ -134,6 +136,7 @@ public class Kafka09FetcherTest {
 				schema,
 				new Properties(),
 				0L,
+				StartupMode.GROUP_OFFSETS,
 				false);
 
 		// ----- run the fetcher -----
@@ -259,6 +262,7 @@ public class Kafka09FetcherTest {
 		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
 				sourceContext,
 				topics,
+				null, /* no restored state */
 				null, /* periodic watermark extractor */
 				null, /* punctuated watermark extractor */
 				new TestProcessingTimeService(),
@@ -270,9 +274,9 @@ public class Kafka09FetcherTest {
 				schema,
 				new Properties(),
 				0L,
+				StartupMode.GROUP_OFFSETS,
 				false);
 
-
 		// ----- run the fetcher -----
 
 		final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -374,6 +378,7 @@ public class Kafka09FetcherTest {
 		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
 				sourceContext,
 				topics,
+				null, /* no restored state */
 				null, /* periodic watermark extractor */
 				null, /* punctuated watermark extractor */
 				new TestProcessingTimeService(),
@@ -385,6 +390,7 @@ public class Kafka09FetcherTest {
 				schema,
 				new Properties(),
 				0L,
+				StartupMode.GROUP_OFFSETS,
 				false);
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index d18e2a9..6added7 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -110,6 +110,24 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 		runMetricsTest();
 	}
 
+	// --- startup mode ---
+
+	@Test(timeout = 60000)
+	public void testStartFromEarliestOffsets() throws Exception {
+		runStartFromEarliestOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromLatestOffsets() throws Exception {
+		runStartFromLatestOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromGroupOffsets() throws Exception {
+		runStartFromGroupOffsets();
+	}
+
+
 	// --- offset committing ---
 
 	@Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 1802e0c..99c11c4 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -48,6 +48,8 @@ import scala.collection.Seq;
 import java.io.File;
 import java.net.BindException;
 import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
@@ -118,8 +120,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public KafkaOffsetHandler createOffsetHandler(Properties props) {
-		return new KafkaOffsetHandlerImpl(props);
+	public KafkaOffsetHandler createOffsetHandler() {
+		return new KafkaOffsetHandlerImpl();
 	}
 
 	@Override
@@ -420,7 +422,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 		private final KafkaConsumer<byte[], byte[]> offsetClient;
 
-		public KafkaOffsetHandlerImpl(Properties props) {
+		public KafkaOffsetHandlerImpl() {
+			Properties props = new Properties();
+			props.putAll(standardProps);
+			props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+			props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
 			offsetClient = new KafkaConsumer<>(props);
 		}
 
@@ -431,6 +438,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		}
 
 		@Override
+		public void setCommittedOffset(String topicName, int partition, long offset) {
+			Map<TopicPartition, OffsetAndMetadata> partitionAndOffset = new HashMap<>();
+			partitionAndOffset.put(new TopicPartition(topicName, partition), new OffsetAndMetadata(offset));
+			offsetClient.commitSync(partitionAndOffset);
+		}
+
+		@Override
 		public void close() {
 			offsetClient.close();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 2918080..1121d1b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -36,6 +37,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
@@ -89,7 +91,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	protected final KeyedDeserializationSchema<T> deserializer;
 
 	/** The set of topic partitions that the source will read */
-	protected List<KafkaTopicPartition> subscribedPartitions;
+	private List<KafkaTopicPartition> subscribedPartitions;
 	
 	/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
 	 * to exploit per-partition timestamp characteristics.
@@ -103,6 +105,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 	private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint;
 
+	/** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}) */
+	protected StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+
 	// ------------------------------------------------------------------------
 	//  runtime state (used individually by each parallel subtask) 
 	// ------------------------------------------------------------------------
@@ -114,7 +119,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
 	
 	/** The offsets to restore to, if the consumer restores state from a checkpoint */
-	private transient volatile HashMap<KafkaTopicPartition, Long> restoreToOffset;
+	private transient volatile HashMap<KafkaTopicPartition, Long> restoredState;
 	
 	/** Flag indicating whether the consumer is still running **/
 	private volatile boolean running = true;
@@ -218,6 +223,41 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		}
 	}
 
+	/**
+	 * Specifies the consumer to start reading from the earliest offset for all partitions.
+	 * This ignores any committed group offsets in Zookeeper / Kafka brokers.
+	 *
+	 * @return The consumer object, to allow function chaining.
+	 */
+	public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
+		this.startupMode = StartupMode.EARLIEST;
+		return this;
+	}
+
+	/**
+	 * Specifies the consumer to start reading from the latest offset for all partitions.
+	 * This ignores any committed group offsets in Zookeeper / Kafka brokers.
+	 *
+	 * @return The consumer object, to allow function chaining.
+	 */
+	public FlinkKafkaConsumerBase<T> setStartFromLatest() {
+		this.startupMode = StartupMode.LATEST;
+		return this;
+	}
+
+	/**
+	 * Specifies the consumer to start reading from any committed group offsets found
+	 * in Zookeeper / Kafka brokers. The "group.id" property must be set in the configuration
+	 * properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset"
+	 * set in the configuration properties will be used for the partition.
+	 *
+	 * @return The consumer object, to allow function chaining.
+	 */
+	public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
+		this.startupMode = StartupMode.GROUP_OFFSETS;
+		return this;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Work methods
 	// ------------------------------------------------------------------------
@@ -231,17 +271,12 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		// we need only do work, if we actually have partitions assigned
 		if (!subscribedPartitions.isEmpty()) {
 
-			// (1) create the fetcher that will communicate with the Kafka brokers
+			// create the fetcher that will communicate with the Kafka brokers
 			final AbstractFetcher<T, ?> fetcher = createFetcher(
-					sourceContext, subscribedPartitions,
+					sourceContext, subscribedPartitions, restoredState,
 					periodicWatermarkAssigner, punctuatedWatermarkAssigner,
 					(StreamingRuntimeContext) getRuntimeContext());
 
-			// (2) set the fetcher to the restored checkpoint offsets
-			if (restoreToOffset != null) {
-				fetcher.restoreOffsets(restoreToOffset);
-			}
-
 			// publish the reference, for snapshot-, commit-, and cancel calls
 			// IMPORTANT: We can only do that now, because only now will calls to
 			//            the fetchers 'snapshotCurrentState()' method return at least
@@ -321,18 +356,18 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
 		if (context.isRestored()) {
-			if (restoreToOffset == null) {
-				restoreToOffset = new HashMap<>();
+			if (restoredState == null) {
+				restoredState = new HashMap<>();
 				for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
-					restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
+					restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
 				}
 
 				LOG.info("Setting restore state in the FlinkKafkaConsumer.");
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Using the following offsets: {}", restoreToOffset);
+					LOG.debug("Using the following offsets: {}", restoredState);
 				}
-			} else if (restoreToOffset.isEmpty()) {
-				restoreToOffset = null;
+			} else if (restoredState.isEmpty()) {
+				restoredState = null;
 			}
 		} else {
 			LOG.info("No restore state for FlinkKafkaConsumer.");
@@ -352,9 +387,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				// the fetcher has not yet been initialized, which means we need to return the
 				// originally restored offsets or the assigned partitions
 
-				if (restoreToOffset != null) {
+				if (restoredState != null) {
 
-					for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
+					for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoredState.entrySet()) {
 						offsetsStateForCheckpoint.add(
 								Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
 					}
@@ -367,7 +402,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 				// the map cannot be asynchronously updated, because only one checkpoint call can happen
 				// on this function at a time: either snapshotState() or notifyCheckpointComplete()
-				pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset);
+				pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
 			} else {
 				HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
 
@@ -393,11 +428,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
 			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
 
-		restoreToOffset = restoredOffsets;
+		restoredState = restoredOffsets;
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}",
-				getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset);
+				getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoredState);
 		}
 	}
 
@@ -470,6 +505,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	protected abstract AbstractFetcher<T, ?> createFetcher(
 			SourceContext<T> sourceContext,
 			List<KafkaTopicPartition> thisSubtaskPartitions,
+			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext) throws Exception;
@@ -492,9 +528,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) {
 		subscribedPartitions = new ArrayList<>();
 
-		if (restoreToOffset != null) {
+		if (restoredState != null) {
 			for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
-				if (restoreToOffset.containsKey(kafkaTopicPartition)) {
+				if (restoredState.containsKey(kafkaTopicPartition)) {
 					subscribedPartitions.add(kafkaTopicPartition);
 				}
 			}
@@ -569,4 +605,14 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		
 		logger.info(sb.toString());
 	}
+
+	@VisibleForTesting
+	List<KafkaTopicPartition> getSubscribedPartitions() {
+		return subscribedPartitions;
+	}
+
+	@VisibleForTesting
+	HashMap<KafkaTopicPartition, Long> getRestoredState() {
+		return restoredState;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
new file mode 100644
index 0000000..331c1a6
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.config;
+
+/**
+ * Startup modes for the Kafka Consumer.
+ */
+public enum StartupMode {
+
+	/** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default) */
+	GROUP_OFFSETS,
+
+	/** Start from the earliest offset possible */
+	EARLIEST,
+
+	/** Start from the latest offset */
+	LATEST
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 821eb03..b27e996 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
@@ -67,9 +68,15 @@ public abstract class AbstractFetcher<T, KPH> {
 	/** The mode describing whether the fetcher also generates timestamps and watermarks */
 	protected final int timestampWatermarkMode;
 
+	/** The startup mode for the consumer (only relevant if the consumer wasn't restored) */
+	protected final StartupMode startupMode;
+
 	/** Flag whether to register metrics for the fetcher */
 	protected final boolean useMetrics;
 
+	/** Flag whether or not the consumer state was restored from a checkpoint / savepoint */
+	protected final boolean isRestored;
+
 	/** Only relevant for punctuated watermarks: The current cross partition watermark */
 	private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
 
@@ -78,15 +85,18 @@ public abstract class AbstractFetcher<T, KPH> {
 	protected AbstractFetcher(
 			SourceContext<T> sourceContext,
 			List<KafkaTopicPartition> assignedPartitions,
+			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			ProcessingTimeService processingTimeProvider,
 			long autoWatermarkInterval,
 			ClassLoader userCodeClassLoader,
+			StartupMode startupMode,
 			boolean useMetrics) throws Exception
 	{
 		this.sourceContext = checkNotNull(sourceContext);
 		this.checkpointLock = sourceContext.getCheckpointLock();
+		this.startupMode = checkNotNull(startupMode);
 		this.useMetrics = useMetrics;
 		
 		// figure out what we watermark mode we will be using
@@ -112,6 +122,18 @@ public abstract class AbstractFetcher<T, KPH> {
 				timestampWatermarkMode,
 				watermarksPeriodic, watermarksPunctuated,
 				userCodeClassLoader);
+
+		if (restoredSnapshotState != null) {
+			for (KafkaTopicPartitionState<?> partition : allPartitions) {
+				Long offset = restoredSnapshotState.get(partition.getKafkaTopicPartition());
+				if (offset != null) {
+					partition.setOffset(offset);
+				}
+			}
+			this.isRestored = true;
+		} else {
+			this.isRestored = false;
+		}
 		
 		// if we have periodic watermarks, kick off the interval scheduler
 		if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
@@ -192,20 +214,6 @@ public abstract class AbstractFetcher<T, KPH> {
 		return state;
 	}
 
-	/**
-	 * Restores the partition offsets.
-	 * 
-	 * @param snapshotState The offsets for the partitions 
-	 */
-	public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) {
-		for (KafkaTopicPartitionState<?> partition : allPartitions) {
-			Long offset = snapshotState.get(partition.getKafkaTopicPartition());
-			if (offset != null) {
-				partition.setOffset(offset);
-			}
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	//  emitting records
 	// ------------------------------------------------------------------------