You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/13 15:39:35 UTC

[1/2] flink git commit: [FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization

Repository: flink
Updated Branches:
  refs/heads/master fcd264a70 -> 5f08e5359


[FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization

This commit adds a reimplemented JavaSerializer to be registered with
Kryo. This is due to a know issue with Kryo's JavaSerializer that may
use the wrong classloader for deserialzation.

Instead of registering Kryo's JavaSerializer for Throwables, it is now
changed to register the reimplemented JavaSerializer. Users who bump
into ClassNotFoundExceptions if they are using Kryo's JavaSerializer for
their own types are also recommended to change to Flink's JavaSerializer.

This closes #3517.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2143172
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2143172
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2143172

Branch: refs/heads/master
Commit: f2143172feca2925832c8b26c3c9fbbb92ecd48f
Parents: fcd264a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Mar 12 22:46:27 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Mar 13 23:31:33 2017 +0800

----------------------------------------------------------------------
 docs/dev/custom_serializers.md                  | 12 +++
 .../typeutils/runtime/kryo/JavaSerializer.java  | 82 ++++++++++++++++++++
 .../typeutils/runtime/kryo/KryoSerializer.java  |  5 +-
 .../apache/flink/util/InstantiationUtil.java    |  4 +-
 4 files changed, 98 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2143172/docs/dev/custom_serializers.md
----------------------------------------------------------------------
diff --git a/docs/dev/custom_serializers.md b/docs/dev/custom_serializers.md
index 2b72ca0..ddfc2ee 100644
--- a/docs/dev/custom_serializers.md
+++ b/docs/dev/custom_serializers.md
@@ -109,4 +109,16 @@ For Google Protobuf you need the following Maven dependency:
 
 Please adjust the versions of both libraries as needed.
 
+### Issue with using Kryo's `JavaSerializer` 
 
+If you register Kryo's `JavaSerializer` for your custom type, you may
+encounter `ClassNotFoundException`s even though your custom type class is
+included in the submitted user code jar. This is due to a know issue with
+Kryo's `JavaSerializer`, which may incorrectly use the wrong classloader.
+
+In this case, you should use `org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer`
+instead to resolve the issue. This is a reimplemented `JavaSerializer` in Flink
+that makes sure the user code classloader is used.
+
+Please refer to [FLINK-6025](https://issues.apache.org/jira/browse/FLINK-6025)
+for more details.

http://git-wip-us.apache.org/repos/asf/flink/blob/f2143172/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
new file mode 100644
index 0000000..a51647c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.util.ObjectMap;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * This is a reimplementation of Kryo's {@link com.esotericsoftware.kryo.serializers.JavaSerializer},
+ * that additionally makes sure the {@link ObjectInputStream} used for deserialization specifically uses Kryo's
+ * registered classloader.
+ *
+ * Flink maintains this reimplementation due to a known issue with Kryo's {@code JavaSerializer}, in which the wrong
+ * classloader may be used for deserialization, leading to {@link ClassNotFoundException}s.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/FLINK-6025">FLINK-6025</a>
+ * @see <a href="https://github.com/EsotericSoftware/kryo/pull/483">Known issue with Kryo's JavaSerializer</a>
+ *
+ * @param <T> The type to be serialized.
+ */
+public class JavaSerializer<T> extends Serializer<T> {
+
+	public JavaSerializer() {}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void write(Kryo kryo, Output output, T o) {
+		try {
+			ObjectMap graphContext = kryo.getGraphContext();
+			ObjectOutputStream objectStream = (ObjectOutputStream)graphContext.get(this);
+			if (objectStream == null) {
+				objectStream = new ObjectOutputStream(output);
+				graphContext.put(this, objectStream);
+			}
+			objectStream.writeObject(o);
+			objectStream.flush();
+		} catch (Exception ex) {
+			throw new KryoException("Error during Java serialization.", ex);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public T read(Kryo kryo, Input input, Class aClass) {
+		try {
+			ObjectMap graphContext = kryo.getGraphContext();
+			ObjectInputStream objectStream = (ObjectInputStream)graphContext.get(this);
+			if (objectStream == null) {
+				// make sure we use Kryo's classloader
+				objectStream = new InstantiationUtil.ClassLoaderObjectInputStream(input, kryo.getClassLoader());
+				graphContext.put(this, objectStream);
+			}
+			return (T) objectStream.readObject();
+		} catch (Exception ex) {
+			throw new KryoException("Error during Java deserialization.", ex);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2143172/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index e74e251..44c952a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -24,7 +24,6 @@ import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
 
 import org.apache.avro.generic.GenericData;
 
@@ -130,7 +129,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
 	@Override
 	public KryoSerializer<T> duplicate() {
-		return new KryoSerializer<T>(this);
+		return new KryoSerializer<>(this);
 	}
 
 	@Override
@@ -331,6 +330,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 			kryo.setReferences(true);
 			
 			// Throwable and all subclasses should be serialized via java serialization
+			// Note: the registered JavaSerializer is Flink's own implementation, and not Kryo's.
+			//       This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for details.
 			kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
 
 			// Add default serializers first, so that they type registrations without a serializer

http://git-wip-us.apache.org/repos/asf/flink/blob/f2143172/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index d4a031c..6441c86 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -45,9 +45,7 @@ import java.util.HashMap;
 public final class InstantiationUtil {
 	
 	/**
-	 * A custom ObjectInputStream that can also load user-code using a
-	 * user-code ClassLoader.
-	 *
+	 * A custom ObjectInputStream that can load classes using a specific ClassLoader.
 	 */
 	public static class ClassLoaderObjectInputStream extends ObjectInputStream {
 


[2/2] flink git commit: [FLINK-3123] [kafka] Allow custom specific start offsets for FlinkKafkaConsumer

Posted by tz...@apache.org.
[FLINK-3123] [kafka] Allow custom specific start offsets for FlinkKafkaConsumer

This closes #2687.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f08e535
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f08e535
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f08e535

Branch: refs/heads/master
Commit: 5f08e53592ebd29cfcd8ee486fcfd6229b82aa69
Parents: f214317
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Mar 10 21:11:42 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Mar 13 23:38:13 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |  35 ++++++-
 .../connectors/kafka/Kafka010ITCase.java        |   4 +
 .../connectors/kafka/Kafka08ITCase.java         |   9 +-
 .../connectors/kafka/Kafka09ITCase.java         |   4 +
 .../kafka/FlinkKafkaConsumerBase.java           | 101 ++++++++++++++++++-
 .../connectors/kafka/config/StartupMode.java    |   9 +-
 .../KafkaConsumerPartitionAssignmentTest.java   |  33 ++++--
 .../connectors/kafka/KafkaConsumerTestBase.java |  82 +++++++++++++--
 8 files changed, 251 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 06e40b2..6d58b0c 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -220,7 +220,40 @@ All versions of the Flink Kafka Consumer have the above explicit configuration m
  record. Under these modes, committed offsets in Kafka will be ignored and
  not used as starting positions.
  
-Note that these settings do not affect the start position when the job is
+You can also specify the exact offsets the consumer should start from for each partition:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
+
+myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
+
+myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
+{% endhighlight %}
+</div>
+</div>
+
+The above example configures the consumer to start from the specified offsets for
+partitions 0, 1, and 2 of topic `myTopic`. The offset values should be the
+next record that the consumer should read for each partition. Note that
+if the consumer needs to read a partition which does not have a specified
+offset within the provided offsets map, it will fallback to the default
+group offsets behaviour (i.e. `setStartFromGroupOffsets()`) for that
+particular partition.
+
+Note that these start position configuration methods do not affect the start position when the job is
 automatically restored from a failure or manually restored using a savepoint.
 On restore, the start position of each Kafka partition is determined by the
 offsets stored in the savepoint or checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index a375fb6..2085169 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -147,6 +147,10 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 		runStartFromGroupOffsets();
 	}
 
+	@Test(timeout = 60000)
+	public void testStartFromSpecificOffsets() throws Exception {
+		runStartFromSpecificOffsets();
+	}
 
 	// --- offset committing ---
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 3fc00e9..2e7c368 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -90,7 +90,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env.getConfig().disableSysoutLogging();
 
-		readSequence(env, StartupMode.GROUP_OFFSETS, standardProps, parallelism, topic, valuesCount, startFrom);
+		readSequence(env, StartupMode.GROUP_OFFSETS, null, standardProps, parallelism, topic, valuesCount, startFrom);
 
 		deleteTestTopic(topic);
 	}
@@ -136,6 +136,11 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		runStartFromGroupOffsets();
 	}
 
+	@Test(timeout = 60000)
+	public void testStartFromSpecificOffsets() throws Exception {
+		runStartFromSpecificOffsets();
+	}
+
 	// --- offset committing ---
 
 	@Test(timeout = 60000)
@@ -200,7 +205,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		readProps.setProperty("auto.commit.interval.ms", "500");
 
 		// read so that the offset can be committed to ZK
-		readSequence(env, StartupMode.GROUP_OFFSETS, readProps, parallelism, topicName, 100, 0);
+		readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, parallelism, topicName, 100, 0);
 
 		// get the offset
 		CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 6added7..ca9965c 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -127,6 +127,10 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 		runStartFromGroupOffsets();
 	}
 
+	@Test(timeout = 60000)
+	public void testStartFromSpecificOffsets() throws Exception {
+		runStartFromSpecificOffsets();
+	}
 
 	// --- offset committing ---
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 144ede8..027751c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -40,11 +40,13 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -104,6 +106,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	/** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}) */
 	protected StartupMode startupMode = StartupMode.GROUP_OFFSETS;
 
+	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS} */
+	protected Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
 	// ------------------------------------------------------------------------
 	//  runtime state (used individually by each parallel subtask) 
 	// ------------------------------------------------------------------------
@@ -210,23 +215,33 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 	/**
 	 * Specifies the consumer to start reading from the earliest offset for all partitions.
-	 * This ignores any committed group offsets in Zookeeper / Kafka brokers.
+	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+	 *
+	 * This method does not effect where partitions are read from when the consumer is restored
+	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+	 * savepoint, only the offsets in the restored state will be used.
 	 *
 	 * @return The consumer object, to allow function chaining.
 	 */
 	public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
 		this.startupMode = StartupMode.EARLIEST;
+		this.specificStartupOffsets = null;
 		return this;
 	}
 
 	/**
 	 * Specifies the consumer to start reading from the latest offset for all partitions.
-	 * This ignores any committed group offsets in Zookeeper / Kafka brokers.
+	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+	 *
+	 * This method does not effect where partitions are read from when the consumer is restored
+	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+	 * savepoint, only the offsets in the restored state will be used.
 	 *
 	 * @return The consumer object, to allow function chaining.
 	 */
 	public FlinkKafkaConsumerBase<T> setStartFromLatest() {
 		this.startupMode = StartupMode.LATEST;
+		this.specificStartupOffsets = null;
 		return this;
 	}
 
@@ -236,10 +251,41 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset"
 	 * set in the configuration properties will be used for the partition.
 	 *
+	 * This method does not effect where partitions are read from when the consumer is restored
+	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+	 * savepoint, only the offsets in the restored state will be used.
+	 *
 	 * @return The consumer object, to allow function chaining.
 	 */
 	public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
 		this.startupMode = StartupMode.GROUP_OFFSETS;
+		this.specificStartupOffsets = null;
+		return this;
+	}
+
+	/**
+	 * Specifies the consumer to start reading partitions from specific offsets, set independently for each partition.
+	 * The specified offset should be the offset of the next record that will be read from partitions.
+	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+	 *
+	 * If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not subscribed by the
+	 * consumer, the entry will be ignored. If the consumer subscribes to a partition that does not exist in the provided
+	 * map of offsets, the consumer will fallback to the default group offset behaviour (see
+	 * {@link FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that particular partition.
+	 *
+	 * If the specified offset for a partition is invalid, or the behaviour for that partition is defaulted to group
+	 * offsets but still no group offset could be found for it, then the "auto.offset.reset" behaviour set in the
+	 * configuration properties will be used for the partition
+	 *
+	 * This method does not effect where partitions are read from when the consumer is restored
+	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
+	 * savepoint, only the offsets in the restored state will be used.
+	 *
+	 * @return The consumer object, to allow function chaining.
+	 */
+	public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+		this.startupMode = StartupMode.SPECIFIC_OFFSETS;
+		this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
 		return this;
 	}
 
@@ -269,7 +315,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 					kafkaTopicPartitions,
 					getRuntimeContext().getIndexOfThisSubtask(),
 					getRuntimeContext().getNumberOfParallelSubtasks(),
-					startupMode);
+					startupMode,
+					specificStartupOffsets);
 
 				if (subscribedPartitionsToStartOffsets.size() != 0) {
 					switch (startupMode) {
@@ -285,6 +332,28 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 								subscribedPartitionsToStartOffsets.size(),
 								subscribedPartitionsToStartOffsets.keySet());
 							break;
+						case SPECIFIC_OFFSETS:
+							LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
+								getRuntimeContext().getIndexOfThisSubtask(),
+								subscribedPartitionsToStartOffsets.size(),
+								specificStartupOffsets,
+								subscribedPartitionsToStartOffsets.keySet());
+
+							List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
+							for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
+								if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+									partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
+								}
+							}
+
+							if (partitionsDefaultedToGroupOffsets.size() > 0) {
+								LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
+										"; their startup offsets will be defaulted to their committed group offsets in Kafka.",
+									getRuntimeContext().getIndexOfThisSubtask(),
+									partitionsDefaultedToGroupOffsets.size(),
+									partitionsDefaultedToGroupOffsets);
+							}
+							break;
 						default:
 						case GROUP_OFFSETS:
 							LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
@@ -550,6 +619,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * @param indexOfThisSubtask the index of this consumer instance
 	 * @param numParallelSubtasks total number of parallel consumer instances
 	 * @param startupMode the configured startup mode for the consumer
+	 * @param specificStartupOffsets specific partition offsets to start from
+	 *                               (only relevant if startupMode is {@link StartupMode#SPECIFIC_OFFSETS})
 	 *
 	 * Note: This method is also exposed for testing.
 	 */
@@ -558,11 +629,31 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			List<KafkaTopicPartition> kafkaTopicPartitions,
 			int indexOfThisSubtask,
 			int numParallelSubtasks,
-			StartupMode startupMode) {
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
 
 		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
 			if (i % numParallelSubtasks == indexOfThisSubtask) {
-				subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
+				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
+					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
+				} else {
+					if (specificStartupOffsets == null) {
+						throw new IllegalArgumentException(
+							"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
+								", but no specific offsets were specified");
+					}
+
+					KafkaTopicPartition partition = kafkaTopicPartitions.get(i);
+
+					Long specificOffset = specificStartupOffsets.get(partition);
+					if (specificOffset != null) {
+						// since the specified offsets represent the next record to read, we subtract
+						// it by one so that the initial state of the consumer will be correct
+						subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1);
+					} else {
+						subscribedPartitionsToStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+					}
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
index f796e62..8fc2fe0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
@@ -30,7 +30,14 @@ public enum StartupMode {
 	EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
 
 	/** Start from the latest offset */
-	LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+	LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),
+
+	/**
+	 * Start from user-supplied specific offsets for each partition.
+	 * Since this mode will have specific offsets to start with, we do not need a sentinel value;
+	 * using Long.MIN_VALUE as a placeholder.
+	 */
+	SPECIFIC_OFFSETS(Long.MIN_VALUE);
 
 	/** The sentinel offset value corresponding to this startup mode */
 	private long stateSentinel;

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index 379d53a..c24640d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -56,7 +56,8 @@ public class KafkaConsumerPartitionAssignmentTest {
 					inPartitions,
 					i,
 					inPartitions.size(),
-					StartupMode.GROUP_OFFSETS);
+					StartupMode.GROUP_OFFSETS,
+					null);
 
 				List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
 
@@ -95,7 +96,8 @@ public class KafkaConsumerPartitionAssignmentTest {
 					partitions,
 					i,
 					numConsumers,
-					StartupMode.GROUP_OFFSETS);
+					StartupMode.GROUP_OFFSETS,
+					null);
 
 				List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
 
@@ -138,7 +140,8 @@ public class KafkaConsumerPartitionAssignmentTest {
 					inPartitions,
 					i,
 					numConsumers,
-					StartupMode.GROUP_OFFSETS);
+					StartupMode.GROUP_OFFSETS,
+					null);
 
 				List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
 
@@ -169,7 +172,8 @@ public class KafkaConsumerPartitionAssignmentTest {
 				ep,
 				2,
 				4,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 			assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
 
 			subscribedPartitionsToStartOffsets = new HashMap<>();
@@ -178,7 +182,8 @@ public class KafkaConsumerPartitionAssignmentTest {
 				ep,
 				0,
 				1,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 			assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
 		}
 		catch (Exception e) {
@@ -218,21 +223,24 @@ public class KafkaConsumerPartitionAssignmentTest {
 				initialPartitions,
 				0,
 				numConsumers,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 
 			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
 				subscribedPartitionsToStartOffsets2,
 				initialPartitions,
 				1,
 				numConsumers,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 
 			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
 				subscribedPartitionsToStartOffsets3,
 				initialPartitions,
 				2,
 				numConsumers,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 
 			List<KafkaTopicPartition> subscribedPartitions1 = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
 			List<KafkaTopicPartition> subscribedPartitions2 = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
@@ -274,21 +282,24 @@ public class KafkaConsumerPartitionAssignmentTest {
 				newPartitions,
 				0,
 				numConsumers,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 
 			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
 				subscribedPartitionsToStartOffsets2,
 				newPartitions,
 				1,
 				numConsumers,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 
 			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
 				subscribedPartitionsToStartOffsets3,
 				newPartitions,
 				2,
 				numConsumers,
-				StartupMode.GROUP_OFFSETS);
+				StartupMode.GROUP_OFFSETS,
+				null);
 
 			List<KafkaTopicPartition> subscribedPartitions1New = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
 			List<KafkaTopicPartition> subscribedPartitions2New = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 580c507..ddac61c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -62,6 +62,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
@@ -349,7 +350,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			(o3 != null) ? o3.intValue() : 0
 		));
 
-		readSequence(env2, StartupMode.GROUP_OFFSETS, standardProps, topicName, partitionsToValuesCountAndStartOffset);
+		readSequence(env2, StartupMode.GROUP_OFFSETS, null, standardProps, topicName, partitionsToValuesCountAndStartOffset);
 
 		kafkaOffsetHandler.close();
 		deleteTestTopic(topicName);
@@ -465,7 +466,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
 		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
 
-		readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0);
+		readSequence(env, StartupMode.EARLIEST, null, readProps, parallelism, topicName, recordsInEachPartition, 0);
 
 		kafkaOffsetHandler.close();
 		deleteTestTopic(topicName);
@@ -619,7 +620,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * 	partition 2 --> start from offset 43, read to offset 49 (7 records)
 	 */
 	public void runStartFromGroupOffsets() throws Exception {
-		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
+		// 3 partitions with 50 records each (offsets 0-49)
 		final int parallelism = 3;
 		final int recordsInEachPartition = 50;
 
@@ -645,7 +646,71 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(50, 0)); // partition 1 should read offset 0-49
 		partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(7, 43));	// partition 2 should read offset 43-49
 
-		readSequence(env, StartupMode.GROUP_OFFSETS, readProps, topicName, partitionsToValueCountAndStartOffsets);
+		readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, topicName, partitionsToValueCountAndStartOffsets);
+
+		kafkaOffsetHandler.close();
+		deleteTestTopic(topicName);
+	}
+
+	/**
+	 * This test ensures that the consumer correctly uses user-supplied specific offsets when explicitly configured to
+	 * start from specific offsets. For partitions which a specific offset can not be found for, the starting position
+	 * for them should fallback to the group offsets behaviour.
+	 *
+	 * 4 partitions will have 50 records with offsets 0 to 49. The supplied specific offsets map is:
+	 * 	partition 0 --> start from offset 19
+	 * 	partition 1 --> not set
+	 * 	partition 2 --> start from offset 22
+	 * 	partition 3 --> not set
+	 * 	partition 4 --> start from offset 26 (this should be ignored because the partition does not exist)
+	 *
+	 * The partitions and their committed group offsets are setup as:
+	 * 	partition 0 --> committed offset 23
+	 * 	partition 1 --> committed offset 31
+	 * 	partition 2 --> committed offset 43
+	 * 	partition 3 --> no commit offset
+	 *
+	 * When configured to start from these specific offsets, each partition should read:
+	 * 	partition 0 --> start from offset 19, read to offset 49 (31 records)
+	 * 	partition 1 --> fallback to group offsets, so start from offset 31, read to offset 49 (19 records)
+	 * 	partition 2 --> start from offset 22, read to offset 49 (28 records)
+	 * 	partition 3 --> fallback to group offsets, but since there is no group offset for this partition,
+	 * 	                will default to "auto.offset.reset" (set to "earliest"),
+	 * 	                so start from offset 0, read to offset 49 (50 records)
+	 */
+	public void runStartFromSpecificOffsets() throws Exception {
+		// 4 partitions with 50 records each (offsets 0-49)
+		final int parallelism = 4;
+		final int recordsInEachPartition = 50;
+
+		final String topicName = writeSequence("testStartFromSpecificOffsetsTopic", recordsInEachPartition, parallelism, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.getConfig().disableSysoutLogging();
+		env.setParallelism(parallelism);
+
+		Properties readProps = new Properties();
+		readProps.putAll(standardProps);
+		readProps.setProperty("auto.offset.reset", "earliest"); // partition 3 should default back to this behaviour
+
+		Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
+		specificStartupOffsets.put(new KafkaTopicPartition(topicName, 0), 19L);
+		specificStartupOffsets.put(new KafkaTopicPartition(topicName, 2), 22L);
+		specificStartupOffsets.put(new KafkaTopicPartition(topicName, 4), 26L); // non-existing partition, should be ignored
+
+		// only the committed offset for partition 1 should be used, because partition 1 has no entry in specific offset map
+		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
+		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+		Map<Integer, Tuple2<Integer, Integer>> partitionsToValueCountAndStartOffsets = new HashMap<>();
+		partitionsToValueCountAndStartOffsets.put(0, new Tuple2<>(31, 19)); // partition 0 should read offset 19-49
+		partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(19, 31)); // partition 1 should read offset 31-49
+		partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 22));	// partition 2 should read offset 22-49
+		partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 0));	// partition 3 should read offset 0-49
+
+		readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, readProps, topicName, partitionsToValueCountAndStartOffsets);
 
 		kafkaOffsetHandler.close();
 		deleteTestTopic(topicName);
@@ -1781,6 +1846,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 */
 	protected void readSequence(final StreamExecutionEnvironment env,
 								final StartupMode startupMode,
+								final Map<KafkaTopicPartition, Long> specificStartupOffsets,
 								final Properties cc,
 								final String topicName,
 								final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) throws Exception {
@@ -1807,6 +1873,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			case LATEST:
 				consumer.setStartFromLatest();
 				break;
+			case SPECIFIC_OFFSETS:
+				consumer.setStartFromSpecificOffsets(specificStartupOffsets);
+				break;
 			case GROUP_OFFSETS:
 				consumer.setStartFromGroupOffsets();
 				break;
@@ -1874,11 +1943,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Properties, String, Map)} to
+	 * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Map, Properties, String, Map)} to
 	 * expect reading from the same start offset and the same value count for all partitions of a single Kafka topic.
 	 */
 	protected void readSequence(final StreamExecutionEnvironment env,
 								final StartupMode startupMode,
+								final Map<KafkaTopicPartition, Long> specificStartupOffsets,
 								final Properties cc,
 								final int sourceParallelism,
 								final String topicName,
@@ -1888,7 +1958,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		for (int i = 0; i < sourceParallelism; i++) {
 			partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom));
 		}
-		readSequence(env, startupMode, cc, topicName, partitionsToValuesCountAndStartOffset);
+		readSequence(env, startupMode, specificStartupOffsets, cc, topicName, partitionsToValuesCountAndStartOffset);
 	}
 
 	protected String writeSequence(