You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/04/13 10:31:07 UTC

[09/14] flink git commit: [FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a WatermarkExtractor object per partition

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
new file mode 100644
index 0000000..6bad180
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
+ * 
+ * @param <T> The type of elements produced by the fetcher.
+ */
+public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implements Runnable {
+
+	private static final Logger LOG = LoggerFactory.getLogger(Kafka09Fetcher.class);
+
+	// ------------------------------------------------------------------------
+
+	/** The schema to convert between Kafka's byte messages, and Flink's objects */
+	private final KeyedDeserializationSchema<T> deserializer;
+
+	/** The subtask's runtime context */
+	private final RuntimeContext runtimeContext;
+
+	/** The configuration for the Kafka consumer */
+	private final Properties kafkaProperties;
+
+	/** The maximum number of milliseconds to wait for a fetch batch */
+	private final long pollTimeout;
+
+	/** Flag whether to register Kafka metrics as Flink accumulators */
+	private final boolean forwardKafkaMetrics;
+
+	/** Mutex to guard against concurrent access to the non-threadsafe Kafka consumer */
+	private final Object consumerLock = new Object();
+
+	/** Reference to the Kafka consumer, once it is created */
+	private volatile KafkaConsumer<byte[], byte[]> consumer;
+
+	/** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */
+	private volatile ExceptionProxy errorHandler;
+
+	/** Flag to mark the main work loop as alive */
+	private volatile boolean running = true;
+
+	// ------------------------------------------------------------------------
+
+	public Kafka09Fetcher(
+			SourceContext<T> sourceContext,
+			List<KafkaTopicPartition> assignedPartitions,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			StreamingRuntimeContext runtimeContext,
+			KeyedDeserializationSchema<T> deserializer,
+			Properties kafkaProperties,
+			long pollTimeout,
+			boolean forwardKafkaMetrics) throws Exception
+	{
+		super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext);
+
+		this.deserializer = deserializer;
+		this.runtimeContext = runtimeContext;
+		this.kafkaProperties = kafkaProperties;
+		this.pollTimeout = pollTimeout;
+		this.forwardKafkaMetrics = forwardKafkaMetrics;
+
+		// if checkpointing is enabled, we are not automatically committing to Kafka.
+		kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+				Boolean.toString(!runtimeContext.isCheckpointingEnabled()));
+	}
+
+	// ------------------------------------------------------------------------
+	//  Fetcher work methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void runFetchLoop() throws Exception {
+		this.errorHandler = new ExceptionProxy(Thread.currentThread());
+
+		// rather than running the main fetch loop directly here, we spawn a dedicated thread
+		// this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code
+		Thread runner = new Thread(this, "Kafka 0.9 Fetcher for " + runtimeContext.getTaskNameWithSubtasks());
+		runner.setDaemon(true);
+		runner.start();
+
+		try {
+			runner.join();
+		} catch (InterruptedException e) {
+			// may be the result of a wake-up after an exception. we ignore this here and only
+			// restore the interruption state
+			Thread.currentThread().interrupt();
+		}
+
+		// make sure we propagate any exception that occurred in the concurrent fetch thread,
+		// before leaving this method
+		this.errorHandler.checkAndThrowException();
+	}
+
+	@Override
+	public void cancel() {
+		// flag the main thread to exit
+		running = false;
+
+		// NOTE:
+		//   - We cannot interrupt the runner thread, because the Kafka consumer may
+		//     deadlock when the thread is interrupted while in certain methods
+		//   - We cannot call close() on the consumer, because it will actually throw
+		//     an exception if a concurrent call is in progress
+
+		// make sure the consumer finds out faster that we are shutting down 
+		if (consumer != null) {
+			consumer.wakeup();
+		}
+	}
+
+	@Override
+	public void run() {
+		// This method initializes the KafkaConsumer and guarantees it is torn down properly.
+		// This is important, because the consumer has multi-threading issues,
+		// including concurrent 'close()' calls.
+
+		final KafkaConsumer<byte[], byte[]> consumer;
+		try {
+			consumer = new KafkaConsumer<>(kafkaProperties);
+		}
+		catch (Throwable t) {
+			running = false;
+			errorHandler.reportError(t);
+			return;
+		}
+
+		// from here on, the consumer will be closed properly
+		try {
+			consumer.assign(convertKafkaPartitions(subscribedPartitions()));
+
+			// register Kafka metrics to Flink accumulators
+			if (forwardKafkaMetrics) {
+				Map<MetricName, ? extends Metric> metrics = consumer.metrics();
+				if (metrics == null) {
+					// MapR's Kafka implementation returns null here.
+					LOG.info("Consumer implementation does not support metrics");
+				} else {
+					// we have metrics, register them where possible
+					for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
+						String name = "KafkaConsumer-" + metric.getKey().name();
+						DefaultKafkaMetricAccumulator kafkaAccumulator =
+								DefaultKafkaMetricAccumulator.createFor(metric.getValue());
+
+						// best effort: we only add the accumulator if available.
+						if (kafkaAccumulator != null) {
+							runtimeContext.addAccumulator(name, kafkaAccumulator);
+						}
+					}
+				}
+			}
+
+			// seek the consumer to the initial offsets
+			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
+				if (partition.isOffsetDefined()) {
+					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+				}
+			}
+
+			// from now on, external operations may call the consumer
+			this.consumer = consumer;
+
+			// main fetch loop
+			while (running) {
+				// get the next batch of records
+				final ConsumerRecords<byte[], byte[]> records;
+				synchronized (consumerLock) {
+					try {
+						records = consumer.poll(pollTimeout);
+					}
+					catch (WakeupException we) {
+						if (running) {
+							throw we;
+						} else {
+							continue;
+						}
+					}
+				}
+
+				// get the records for each topic partition
+				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
+					
+					List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle());
+
+					for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
+						T value = deserializer.deserialize(
+								record.key(), record.value(),
+								record.topic(), record.partition(), record.offset());
+
+						if (deserializer.isEndOfStream(value)) {
+							// end of stream signaled
+							running = false;
+							break;
+						}
+
+						// emit the actual record. this also update offset state atomically
+						// and deals with timestamps and watermark generation
+						emitRecord(value, partition, record.offset());
+					}
+				}
+			}
+			// end main fetch loop
+		}
+		catch (Throwable t) {
+			if (running) {
+				running = false;
+				errorHandler.reportError(t);
+			} else {
+				LOG.debug("Stopped ConsumerThread threw exception", t);
+			}
+		}
+		finally {
+			try {
+				synchronized (consumerLock) {
+					consumer.close();
+				}
+			} catch (Throwable t) {
+				LOG.warn("Error while closing Kafka 0.9 consumer", t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Kafka 0.9 specific fetcher behavior
+	// ------------------------------------------------------------------------
+
+	@Override
+	public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
+		return new TopicPartition(partition.getTopic(), partition.getPartition());
+	}
+
+	@Override
+	public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+		KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions();
+		Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
+
+		for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
+			Long offset = offsets.get(partition.getKafkaTopicPartition());
+			if (offset != null) {
+				offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offset, ""));
+			}
+		}
+
+		if (this.consumer != null) {
+			synchronized (consumerLock) {
+				this.consumer.commitSync(offsetsToCommit);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	public static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
+		ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
+		for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
+			result.add(p.getKafkaPartitionHandle());
+		}
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 82e1dce..afb0056 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -27,11 +27,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 	// ------------------------------------------------------------------------
 
 	@Test(timeout = 60000)
-	public void testCheckpointing() throws Exception {
-		runCheckpointingTest();
-	}
-
-	@Test(timeout = 60000)
 	public void testFailOnNoBroker() throws Exception {
 		runFailOnNoBrokerTest();
 	}
@@ -41,15 +36,15 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 		runSimpleConcurrentProducerConsumerTopology();
 	}
 
-	@Test(timeout = 60000)
-	public void testPunctuatedExplicitWMConsumer() throws Exception {
-		runExplicitPunctuatedWMgeneratingConsumerTest(false);
-	}
+//	@Test(timeout = 60000)
+//	public void testPunctuatedExplicitWMConsumer() throws Exception {
+//		runExplicitPunctuatedWMgeneratingConsumerTest(false);
+//	}
 
-	@Test(timeout = 60000)
-	public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
-		runExplicitPunctuatedWMgeneratingConsumerTest(true);
-	}
+//	@Test(timeout = 60000)
+//	public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
+//		runExplicitPunctuatedWMgeneratingConsumerTest(true);
+//	}
 
 	@Test(timeout = 60000)
 	public void testKeyValueSupport() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index a2c4f73..b80a231 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -22,19 +22,23 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
+
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
@@ -60,7 +64,7 @@ public class KafkaProducerTest extends TestLogger {
 			
 			// partition setup
 			when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-					Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null)));
+					Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
 
 			// failure when trying to send an element
 			when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
index 74b35af..c1b21b7 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.Test;
 
+@SuppressWarnings("serial")
 public class KafkaShortRetention09ITCase extends KafkaShortRetentionTestBase {
 
 	@Test(timeout=60000)

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
index 6bdfb48..fbeb110 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -25,5 +25,6 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index d9e813f..0ca8fd5 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -18,427 +18,291 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.TimestampAssigner;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
-
-import static java.util.Objects.requireNonNull;
-import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.checkArgument;
-
-public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
-		implements CheckpointListener, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T>, Triggerable {
-
-	// ------------------------------------------------------------------------
 
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * Base class of all Flink Kafka Consumer data sources.
+ * This implements the common behavior across all Kafka versions.
+ * 
+ * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the
+ * {@link AbstractFetcher}.
+ * 
+ * @param <T> The type of records produced by this data source
+ */
+public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements 
+		CheckpointListener,
+		CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>,
+		ResultTypeQueryable<T>
+{
 	private static final long serialVersionUID = -6272159445203409112L;
 
-	/** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
-	 * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
-	public static final long OFFSET_NOT_SET = -915623761776L;
-
+	protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+	
 	/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
 	public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
 
-
-	/** The schema to convert between Kafka#s byte messages, and Flink's objects */
+	// ------------------------------------------------------------------------
+	//  configuration state, set on the client relevant for all subtasks
+	// ------------------------------------------------------------------------
+	
+	/** The schema to convert between Kafka's byte messages, and Flink's objects */
 	protected final KeyedDeserializationSchema<T> deserializer;
 
-	// ------  Runtime State  -------
+	/** The set of topic partitions that the source will read */
+	protected List<KafkaTopicPartition> allSubscribedPartitions;
+	
+	/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
+	 * to exploit per-partition timestamp characteristics.
+	 * The assigner is kept in serialized form, to deserialize it into multiple copies */
+	private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner;
+	
+	/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
+	 * to exploit per-partition timestamp characteristics. 
+	 * The assigner is kept in serialized form, to deserialize it into multiple copies */
+	private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
 
+	// ------------------------------------------------------------------------
+	//  runtime state (used individually by each parallel subtask) 
+	// ------------------------------------------------------------------------
+	
 	/** Data for pending but uncommitted checkpoints */
-	protected final LinkedMap pendingCheckpoints = new LinkedMap();
-
-	/**
-	 * Information about the partitions being read by the local consumer. This contains:
-	 * offsets of the last returned elements, and if a timestamp assigner is used, it
-	 * also contains the maximum seen timestamp in the partition and if the partition
-	 * still receives elements or it is inactive.
-	 */
-	protected transient HashMap<KafkaTopicPartition, KafkaPartitionState> partitionState;
+	private final LinkedMap pendingCheckpoints = new LinkedMap();
 
+	/** The fetcher implements the connections to the Kafka brokers */
+	private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
+	
 	/** The offsets to restore to, if the consumer restores state from a checkpoint */
-	protected transient HashMap<KafkaTopicPartition, Long> restoreToOffset;
-
+	private transient volatile HashMap<KafkaTopicPartition, Long> restoreToOffset;
+	
 	/** Flag indicating whether the consumer is still running **/
-	protected volatile boolean running = true;
+	private volatile boolean running = true;
 
 	// ------------------------------------------------------------------------
-	//							WATERMARK EMISSION
-	// ------------------------------------------------------------------------
 
 	/**
-	 * The user-specified methods to extract the timestamps from the records in Kafka, and
-	 * to decide when to emit watermarks.
-	 */
-	private AssignerWithPunctuatedWatermarks<T> punctuatedWatermarkAssigner;
-
-	/**
-	 * The user-specified methods to extract the timestamps from the records in Kafka, and
-	 * to decide when to emit watermarks.
-	 */
-	private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
-
-	private StreamingRuntimeContext runtime = null;
-
-	private SourceContext<T> srcContext = null;
-
-	/**
-	 * The interval between consecutive periodic watermark emissions,
-	 * as configured via the {@link ExecutionConfig#getAutoWatermarkInterval()}.
-	 */
-	private long watermarkInterval = -1;
-
-	/** The last emitted watermark. */
-	private long lastEmittedWatermark = Long.MIN_VALUE;
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
-	 *
-	 * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
-	 * at the beginning of this class.</p>
+	 * Base constructor.
 	 *
 	 * @param deserializer
 	 *           The deserializer to turn raw byte messages into Java/Scala objects.
-	 * @param props
-	 *           The properties that are used to configure both the fetcher and the offset handler.
 	 */
-	public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> deserializer, Properties props) {
-		this.deserializer = requireNonNull(deserializer, "valueDeserializer");
+	public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> deserializer) {
+		this.deserializer = checkNotNull(deserializer, "valueDeserializer");
 	}
 
 	/**
-	 * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner. Bare in mind
-	 * that the source can either have an {@link AssignerWithPunctuatedWatermarks} or an
-	 * {@link AssignerWithPeriodicWatermarks}, not both.
+	 * This method must be called from the subclasses, to set the list of all subscribed partitions
+	 * that this consumer will fetch from (across all subtasks).
+	 * 
+	 * @param allSubscribedPartitions The list of all partitions that all subtasks together should fetch from.
 	 */
-	public FlinkKafkaConsumerBase<T> setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) {
-		checkEmitterDuringInit();
-		this.punctuatedWatermarkAssigner = assigner;
-		return this;
+	protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) {
+		checkNotNull(allSubscribedPartitions);
+		this.allSubscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Configuration
+	// ------------------------------------------------------------------------
+	
 	/**
-	 * Specifies an {@link AssignerWithPeriodicWatermarks} to emit watermarks periodically. Bare in mind that the
-	 * source can either have an {@link AssignerWithPunctuatedWatermarks} or an
-	 * {@link AssignerWithPeriodicWatermarks}, not both.
+	 * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
+	 * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
+	 * in the same way as in the Flink runtime, when streams are merged.
+	 * 
+	 * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
+	 * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
+	 * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
+	 * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
+	 * parallel source subtask reads more that one partition.
+	 * 
+	 * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
+	 * partition, allows users to let them exploit the per-partition characteristics.
+	 * 
+	 * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
+	 * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
+	 * 
+	 * @param assigner The timestamp assigner / watermark generator to use.
+	 * @return The consumer object, to allow function chaining.   
 	 */
-	public FlinkKafkaConsumerBase<T> setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) {
-		checkEmitterDuringInit();
-		this.periodicWatermarkAssigner = assigner;
-		return this;
-	}
-
-	/**
-	 * Processes the element after having been read from Kafka and deserialized, and updates the
-	 * last read offset for the specifies partition. These two actions should be performed in
-	 * an atomic way in order to guarantee exactly once semantics.
-	 * @param sourceContext
-	 *           The context the task operates in.
-	 * @param partDescriptor
-	 *            A descriptor containing the topic and the id of the partition.
-	 * @param value
-	 *           The element to process.
-	 * @param offset
-	 *           The offset of the element in the partition.
-	 * */
-	public void processElement(SourceContext<T> sourceContext, KafkaTopicPartition partDescriptor, T value, long offset) {
-		if (punctuatedWatermarkAssigner == null && periodicWatermarkAssigner == null) {
-			// the case where no watermark emitter is specified.
-			sourceContext.collect(value);
-		} else {
-
-			if (srcContext == null) {
-				srcContext = sourceContext;
-			}
-
-			long extractedTimestamp = extractTimestampAndEmitElement(partDescriptor, value);
-
-			// depending on the specified watermark emitter, either send a punctuated watermark,
-			// or set the timer for the first periodic watermark. In the periodic case, we set the timer
-			// only for the first watermark, as it is the trigger() that will set the subsequent ones.
-
-			if (punctuatedWatermarkAssigner != null) {
-				final Watermark nextWatermark = punctuatedWatermarkAssigner
-					.checkAndGetNextWatermark(value, extractedTimestamp);
-				if (nextWatermark != null) {
-					emitWatermarkIfMarkingProgress(sourceContext);
-				}
-			} else if(periodicWatermarkAssigner != null && runtime == null) {
-				runtime = (StreamingRuntimeContext) getRuntimeContext();
-				watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval();
-				if (watermarkInterval > 0) {
-					runtime.registerTimer(System.currentTimeMillis() + watermarkInterval, this);
-				}
-			}
+	public FlinkKafkaConsumerBase<T> setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) {
+		checkNotNull(assigner);
+		
+		if (this.periodicWatermarkAssigner != null) {
+			throw new IllegalStateException("A periodic watermark emitter has already been set.");
+		}
+		try {
+			this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner);
+			return this;
+		} catch (Exception e) {
+			throw new IllegalArgumentException("The given assigner is not serializable", e);
 		}
-		updateOffsetForPartition(partDescriptor, offset);
 	}
 
 	/**
-	 * Extract the timestamp from the element based on the user-specified extractor,
-	 * emit the element with the new timestamp, and update the partition monitoring info (if necessary).
-	 * In more detail, upon reception of an element with a timestamp greater than the greatest timestamp
-	 * seen so far in that partition, this method updates the maximum timestamp seen for that partition,
-	 * and marks the partition as {@code active}, i.e. it still receives fresh data.
-	 * @param partDescriptor the partition the new element belongs to.
-	 * @param value the element to be forwarded.
-	 * @return the timestamp of the new element.
+	 * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
+	 * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
+	 * in the same way as in the Flink runtime, when streams are merged.
+	 *
+	 * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
+	 * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
+	 * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
+	 * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
+	 * parallel source subtask reads more that one partition.
+	 *
+	 * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
+	 * partition, allows users to let them exploit the per-partition characteristics.
+	 *
+	 * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
+	 * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
+	 *
+	 * @param assigner The timestamp assigner / watermark generator to use.
+	 * @return The consumer object, to allow function chaining.   
 	 */
-	private long extractTimestampAndEmitElement(KafkaTopicPartition partDescriptor, T value) {
-		long extractedTimestamp = getTimestampAssigner().extractTimestamp(value, Long.MIN_VALUE);
-		srcContext.collectWithTimestamp(value, extractedTimestamp);
-		updateMaximumTimestampForPartition(partDescriptor, extractedTimestamp);
-		return extractedTimestamp;
-	}
-
-	/**
-	 * Upon reception of an element with a timestamp greater than the greatest timestamp seen so far in the partition,
-	 * this method updates the maximum timestamp seen for that partition to {@code timestamp}, and marks the partition
-	 * as {@code active}, i.e. it still receives fresh data. If the partition is not known to the system, then a new
-	 * {@link KafkaPartitionState} is created and is associated to the new partition for future monitoring.
-	 * @param partDescriptor
-	 *            A descriptor containing the topic and the id of the partition.
-	 * @param timestamp
-	 *           The timestamp to set the minimum to, if smaller than the already existing one.
-	 * @return {@code true} if the minimum was updated successfully to {@code timestamp}, {@code false}
-	 *           if the previous value is smaller than the provided timestamp
-	 * */
-	private boolean updateMaximumTimestampForPartition(KafkaTopicPartition partDescriptor, long timestamp) {
-		KafkaPartitionState info = getOrInitializeInfo(partDescriptor);
-
-		if(timestamp > info.getMaxTimestamp()) {
-
-			// the flag is set to false as soon as the current partition's max timestamp is sent as a watermark.
-			// if then, and for that partition, only late elements arrive, then the max timestamp will stay the
-			// same, and it will keep the overall system from progressing.
-			// To avoid this, we only mark a partition as active on non-late elements.
-
-			info.setActive(true);
-			info.setMaxTimestamp(timestamp);
-			return  true;
+	public FlinkKafkaConsumerBase<T> setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) {
+		checkNotNull(assigner);
+		
+		if (this.punctuatedWatermarkAssigner != null) {
+			throw new IllegalStateException("A punctuated watermark emitter has already been set.");
+		}
+		try {
+			this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
+			return this;
+		} catch (Exception e) {
+			throw new IllegalArgumentException("The given assigner is not serializable", e);
 		}
-		return false;
 	}
 
-	/**
-	 * Updates the last read offset for the partition specified by the {@code partDescriptor} to {@code offset}.
-	 * If it is the first time we see the partition, then a new {@link KafkaPartitionState} is created to monitor
-	 * this specific partition.
-	 * @param partDescriptor the partition whose info to update.
-	 * @param offset the last read offset of the partition.
-	 */
-	public void updateOffsetForPartition(KafkaTopicPartition partDescriptor, long offset) {
-		KafkaPartitionState info = getOrInitializeInfo(partDescriptor);
-		info.setOffset(offset);
-	}
+	// ------------------------------------------------------------------------
+	//  Work methods
+	// ------------------------------------------------------------------------
 
 	@Override
-	public void trigger(long timestamp) throws Exception {
-		if(this.srcContext == null) {
-			// if the trigger is called before any elements, then we
-			// just set the next timer to fire when it should and we
-			// ignore the triggering as this would produce no results.
-			setNextWatermarkTimer();
-			return;
+	public void run(SourceContext<T> sourceContext) throws Exception {
+		if (allSubscribedPartitions == null) {
+			throw new Exception("The partitions were not set for the consumer");
 		}
-
-		// this is valid because this method is only called when watermarks
-		// are set to be emitted periodically.
-		final Watermark nextWatermark = periodicWatermarkAssigner.getCurrentWatermark();
-		if(nextWatermark != null) {
-			emitWatermarkIfMarkingProgress(srcContext);
-		}
-		setNextWatermarkTimer();
-	}
-
-	/**
-	 * Emits a new watermark, with timestamp equal to the minimum across all the maximum timestamps
-	 * seen per local partition (across all topics). The new watermark is emitted if and only if
-	 * it signals progress in event-time, i.e. if its timestamp is greater than the timestamp of
-	 * the last emitted watermark. In addition, this method marks as inactive the partition whose
-	 * timestamp was emitted as watermark, i.e. the one with the minimum across the maximum timestamps
-	 * of the local partitions. This is done to avoid not making progress because
-	 * a partition stopped receiving data. The partition is going to be marked as {@code active}
-	 * as soon as the <i>next non-late</i> element arrives.
-	 *
-	 * @return {@code true} if the Watermark was successfully emitted, {@code false} otherwise.
-	 */
-	private boolean emitWatermarkIfMarkingProgress(SourceFunction.SourceContext<T> sourceContext) {
-		Tuple2<KafkaTopicPartition, Long> globalMinTs = getMinTimestampAcrossAllTopics();
-		if(globalMinTs.f0 != null ) {
-			synchronized (sourceContext.getCheckpointLock()) {
-				long minTs = globalMinTs.f1;
-				if(minTs > lastEmittedWatermark) {
-					lastEmittedWatermark = minTs;
-					Watermark toEmit = new Watermark(minTs);
-					sourceContext.emitWatermark(toEmit);
-					return true;
-				}
+		
+		// figure out which partitions this subtask should process
+		final List<KafkaTopicPartition> thisSubtaskPartitions = assignPartitions(allSubscribedPartitions,
+				getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask());
+		
+		// we need only do work, if we actually have partitions assigned
+		if (!thisSubtaskPartitions.isEmpty()) {
+
+			// (1) create the fetcher that will communicate with the Kafka brokers
+			final AbstractFetcher<T, ?> fetcher = createFetcher(
+					sourceContext, thisSubtaskPartitions, 
+					periodicWatermarkAssigner, punctuatedWatermarkAssigner,
+					(StreamingRuntimeContext) getRuntimeContext());
+
+			// (2) set the fetcher to the restored checkpoint offsets
+			if (restoreToOffset != null) {
+				fetcher.restoreOffsets(restoreToOffset);
 			}
-		}
-		return false;
-	}
 
-	/**
-	 * Kafka sources with timestamp extractors are expected to keep the maximum timestamp seen per
-	 * partition they are reading from. This is to mark the per-partition event-time progress.
-	 *
-	 * This method iterates this list, and returns the minimum timestamp across these per-partition
-	 * max timestamps, and across all topics. In addition to this information, it also returns the topic and
-	 * the partition within the topic the timestamp belongs to.
-	 */
-	private Tuple2<KafkaTopicPartition, Long> getMinTimestampAcrossAllTopics() {
-		Tuple2<KafkaTopicPartition, Long> minTimestamp = new Tuple2<>(null, Long.MAX_VALUE);
-		for(Map.Entry<KafkaTopicPartition, KafkaPartitionState> entries: partitionState.entrySet()) {
-			KafkaTopicPartition part = entries.getKey();
-			KafkaPartitionState info = entries.getValue();
-
-			if(partitionIsActive(part) && info.getMaxTimestamp() < minTimestamp.f1) {
-				minTimestamp.f0 = part;
-				minTimestamp.f1 = info.getMaxTimestamp();
+			// publish the reference, for snapshot-, commit-, and cancel calls
+			// IMPORTANT: We can only do that now, because only now will calls to
+			//            the fetchers 'snapshotCurrentState()' method return at least
+			//            the restored offsets
+			this.kafkaFetcher = fetcher;
+			if (!running) {
+				return;
 			}
+			
+			// (3) run the fetcher' main work method
+			fetcher.runFetchLoop();
 		}
-
-		if(minTimestamp.f0 != null) {
-			// it means that we have a winner and we have to set its flag to
-			// inactive, until its next non-late element.
-			KafkaTopicPartition partitionDescriptor = minTimestamp.f0;
-			setActiveFlagForPartition(partitionDescriptor, false);
-		}
-
-		return minTimestamp;
-	}
-
-	/**
-	 * Sets the {@code active} flag for a given partition of a topic to {@code isActive}.
-	 * This flag signals if the partition is still receiving data and it is used to avoid the case
-	 * where a partition stops receiving data, so its max seen timestamp does not advance, and it
-	 * holds back the progress of the watermark for all partitions. Note that if the partition is
-	 * not known to the system, then a new {@link KafkaPartitionState} is created and is associated
-	 * to the new partition for future monitoring.
-	 *
-	 * @param partDescriptor
-	 * 				A descriptor containing the topic and the id of the partition.
-	 * @param isActive
-	 * 				The value {@code true} or {@code false} to set the flag to.
-	 */
-	private void setActiveFlagForPartition(KafkaTopicPartition partDescriptor, boolean isActive) {
-		KafkaPartitionState info = getOrInitializeInfo(partDescriptor);
-		info.setActive(isActive);
-	}
-
-	/**
-	 * Gets the statistics for a given partition specified by the {@code partition} argument.
-	 * If it is the first time we see this partition, a new {@link KafkaPartitionState} data structure
-	 * is initialized to monitor it from now on. This method never throws a {@link NullPointerException}.
-	 * @param partition the partition to be fetched.
-	 * @return the gathered statistics for that partition.
-	 * */
-	private KafkaPartitionState getOrInitializeInfo(KafkaTopicPartition partition) {
-		KafkaPartitionState info = partitionState.get(partition);
-		if(info == null) {
-			info = new KafkaPartitionState(partition.getPartition(), FlinkKafkaConsumerBase.OFFSET_NOT_SET);
-			partitionState.put(partition, info);
+		else {
+			// this source never completes, so emit a Long.MAX_VALUE watermark
+			// to not block watermark forwarding
+			sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
+
+			// wait until this is canceled
+			final Object waitLock = new Object();
+			while (running) {
+				try {
+					//noinspection SynchronizationOnLocalVariableOrMethodParameter
+					synchronized (waitLock) {
+						waitLock.wait();
+					}
+				}
+				catch (InterruptedException e) {
+					if (!running) {
+						// restore the interrupted state, and fall through the loop
+						Thread.currentThread().interrupt();
+					}
+				}
+			}
 		}
-		return info;
 	}
 
-	/**
-	 * Checks if a partition of a topic is still active, i.e. if it still receives data.
-	 * @param partDescriptor
-	 *          A descriptor containing the topic and the id of the partition.
-	 * */
-	private boolean partitionIsActive(KafkaTopicPartition partDescriptor) {
-		KafkaPartitionState info = partitionState.get(partDescriptor);
-		if(info == null) {
-			throw new RuntimeException("Unknown Partition: Topic=" + partDescriptor.getTopic() +
-				" Partition=" + partDescriptor.getPartition());
+	@Override
+	public void cancel() {
+		// set ourselves as not running
+		running = false;
+		
+		// abort the fetcher, if there is one
+		if (kafkaFetcher != null) {
+			kafkaFetcher.cancel();
 		}
-		return info.isActive();
-	}
 
-	private TimestampAssigner<T> getTimestampAssigner() {
-		checkEmitterStateAfterInit();
-		return periodicWatermarkAssigner != null ? periodicWatermarkAssigner : punctuatedWatermarkAssigner;
-	}
-
-	private void setNextWatermarkTimer() {
-		long timeToNextWatermark = System.currentTimeMillis() + watermarkInterval;
-		runtime.registerTimer(timeToNextWatermark, this);
-	}
-
-	private void checkEmitterDuringInit() {
-		if(periodicWatermarkAssigner != null) {
-			throw new RuntimeException("A periodic watermark emitter has already been provided.");
-		} else if(punctuatedWatermarkAssigner != null) {
-			throw new RuntimeException("A punctuated watermark emitter has already been provided.");
-		}
+		// there will be an interrupt() call to the main thread anyways
 	}
 
-	private void checkEmitterStateAfterInit() {
-		if(periodicWatermarkAssigner == null && punctuatedWatermarkAssigner == null) {
-			throw new RuntimeException("The timestamp assigner has not been initialized.");
-		} else if(periodicWatermarkAssigner != null && punctuatedWatermarkAssigner != null) {
-			throw new RuntimeException("The source can either have an assigner with punctuated " +
-				"watermarks or one with periodic watermarks, not both.");
+	@Override
+	public void close() throws Exception {
+		// pretty much the same logic as cancelling
+		try {
+			cancel();
+		} finally {
+			super.close();
 		}
 	}
-
+	
 	// ------------------------------------------------------------------------
 	//  Checkpoint and restore
 	// ------------------------------------------------------------------------
-
-	HashMap<KafkaTopicPartition, KafkaPartitionState> restoreInfoFromCheckpoint() {
-		HashMap<KafkaTopicPartition, KafkaPartitionState> partInfo = new HashMap<>(restoreToOffset.size());
-		for(Map.Entry<KafkaTopicPartition, Long> offsets: restoreToOffset.entrySet()) {
-			KafkaTopicPartition key = offsets.getKey();
-			partInfo.put(key, new KafkaPartitionState(key.getPartition(), offsets.getValue()));
-		}
-		return partInfo;
-	}
-
+	
 	@Override
 	public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		if (partitionState == null) {
-			LOG.debug("snapshotState() requested on not yet opened source; returning null.");
-			return null;
-		}
 		if (!running) {
 			LOG.debug("snapshotState() called on closed source");
 			return null;
 		}
-
-		HashMap<KafkaTopicPartition, Long> currentOffsets = new HashMap<>();
-		for (Map.Entry<KafkaTopicPartition, KafkaPartitionState> entry: partitionState.entrySet()) {
-			currentOffsets.put(entry.getKey(), entry.getValue().getOffset());
+		
+		final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+		if (fetcher == null) {
+			// the fetcher has not yet been initialized, which means we need to return the
+			// originally restored offsets
+			return restoreToOffset;
 		}
 
+		HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}",
 					KafkaTopicPartition.toString(currentOffsets), checkpointId, checkpointTimestamp);
@@ -447,7 +311,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		// the map cannot be asynchronously updated, because only one checkpoint call can happen
 		// on this function at a time: either snapshotState() or notifyCheckpointComplete()
 		pendingCheckpoints.put(checkpointId, currentOffsets);
-			
+		
+		// truncate the map, to prevent infinite growth
 		while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
 			pendingCheckpoints.remove(0);
 		}
@@ -457,51 +322,49 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 	@Override
 	public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
-		LOG.info("Setting restore state in Kafka");
+		LOG.info("Setting restore state in the FlinkKafkaConsumer");
 		restoreToOffset = restoredOffsets;
 	}
 
 	@Override
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
-		if (partitionState == null) {
-			LOG.debug("notifyCheckpointComplete() called on uninitialized source");
-			return;
-		}
 		if (!running) {
 			LOG.debug("notifyCheckpointComplete() called on closed source");
 			return;
 		}
+
+		final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+		if (fetcher == null) {
+			LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+			return;
+		}
 		
 		// only one commit operation must be in progress
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Committing offsets externally for checkpoint {}", checkpointId);
+			LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
 		}
 
 		try {
-			HashMap<KafkaTopicPartition, Long> checkpointOffsets;
-	
-			// the map may be asynchronously updates when snapshotting state, so we synchronize
-			synchronized (pendingCheckpoints) {
-				final int posInMap = pendingCheckpoints.indexOf(checkpointId);
-				if (posInMap == -1) {
-					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
-					return;
-				}
+			final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+			if (posInMap == -1) {
+				LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+				return;
+			}
 
-				//noinspection unchecked
-				checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
+			@SuppressWarnings("unchecked")
+			HashMap<KafkaTopicPartition, Long> checkpointOffsets = 
+					(HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
 
-				
-				// remove older checkpoints in map
-				for (int i = 0; i < posInMap; i++) {
-					pendingCheckpoints.remove(0);
-				}
+			// remove older checkpoints in map
+			for (int i = 0; i < posInMap; i++) {
+				pendingCheckpoints.remove(0);
 			}
+
 			if (checkpointOffsets == null || checkpointOffsets.size() == 0) {
 				LOG.debug("Checkpoint state was empty.");
 				return;
 			}
-			commitOffsets(checkpointOffsets);
+			fetcher.commitSpecificOffsetsToKafka(checkpointOffsets);
 		}
 		catch (Exception e) {
 			if (running) {
@@ -511,33 +374,77 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		}
 	}
 
-	protected abstract void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) throws Exception;
-
-
+	// ------------------------------------------------------------------------
+	//  Kafka Consumer specific methods
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the
+	 * data, and emits it into the data streams.
+	 * 
+	 * @param sourceContext The source context to emit data to.
+	 * @param thisSubtaskPartitions The set of partitions that this subtask should handle.
+	 * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator.
+	 * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator.
+	 * @param runtimeContext The task's runtime context.
+	 * 
+	 * @return The instantiated fetcher
+	 * 
+	 * @throws Exception The method should forward exceptions
+	 */
+	protected abstract AbstractFetcher<T, ?> createFetcher(
+			SourceContext<T> sourceContext,
+			List<KafkaTopicPartition> thisSubtaskPartitions,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			StreamingRuntimeContext runtimeContext) throws Exception;
+	
+	// ------------------------------------------------------------------------
+	//  ResultTypeQueryable methods 
+	// ------------------------------------------------------------------------
+	
 	@Override
 	public TypeInformation<T> getProducedType() {
 		return deserializer.getProducedType();
 	}
 
-	protected static <T> List<T> assignPartitions(List<T> partitions, int numConsumers, int consumerIndex) {
-		checkArgument(numConsumers > 0);
-		checkArgument(consumerIndex < numConsumers);
-
-		List<T> partitionsToSub = new ArrayList<>();
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
 
-		for (int i = 0; i < partitions.size(); i++) {
+	/**
+	 * Selects which of the given partitions should be handled by a specific consumer,
+	 * given a certain number of consumers.
+	 * 
+	 * @param allPartitions The partitions to select from
+	 * @param numConsumers The number of consumers
+	 * @param consumerIndex The index of the specific consumer
+	 * 
+	 * @return The sublist of partitions to be handled by that consumer.
+	 */
+	protected static List<KafkaTopicPartition> assignPartitions(
+			List<KafkaTopicPartition> allPartitions,
+			int numConsumers, int consumerIndex)
+	{
+		final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
+				allPartitions.size() / numConsumers + 1);
+
+		for (int i = 0; i < allPartitions.size(); i++) {
 			if (i % numConsumers == consumerIndex) {
-				partitionsToSub.add(partitions.get(i));
+				thisSubtaskPartitions.add(allPartitions.get(i));
 			}
 		}
-		return partitionsToSub;
+		
+		return thisSubtaskPartitions;
 	}
-
+	
 	/**
-	 * Method to log partition information.
+	 * Logs the partition information in INFO level.
+	 * 
+	 * @param logger The logger to log to.
 	 * @param partitionInfos List of subscribed partitions
 	 */
-	public static void logPartitionInfo(List<KafkaTopicPartition> partitionInfos) {
+	protected static void logPartitionInfo(Logger logger, List<KafkaTopicPartition> partitionInfos) {
 		Map<String, Integer> countPerTopic = new HashMap<>();
 		for (KafkaTopicPartition partition : partitionInfos) {
 			Integer count = countPerTopic.get(partition.getTopic());
@@ -548,12 +455,13 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			}
 			countPerTopic.put(partition.getTopic(), count);
 		}
-		StringBuilder sb = new StringBuilder();
+		StringBuilder sb = new StringBuilder(
+				"Consumer is going to read the following topics (with number of partitions): ");
+		
 		for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
 			sb.append(e.getKey()).append(" (").append(e.getValue()).append("), ");
 		}
-		LOG.info("Consumer is going to read the following topics (with number of partitions): {}", sb.toString());
+		
+		logger.info(sb.toString());
 	}
-
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
new file mode 100644
index 0000000..594aa66
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all fetchers, which implement the connections to Kafka brokers and
+ * pull records from Kafka partitions.
+ * 
+ * <p>This fetcher base class implements the logic around emitting records and tracking offsets,
+ * as well as around the optional timestamp assignment and watermark generation. 
+ * 
+ * @param <T> The type of elements deserialized from Kafka's byte records, and emitted into
+ *            the Flink data streams.
+ * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version.
+ */
+public abstract class AbstractFetcher<T, KPH> {
+	
+	private static final int NO_TIMESTAMPS_WATERMARKS = 0;
+	private static final int PERIODIC_WATERMARKS = 1;
+	private static final int PUNCTUATED_WATERMARKS = 2;
+	
+	// ------------------------------------------------------------------------
+	
+	/** The source context to emit records and watermarks to */
+	private final SourceContext<T> sourceContext;
+
+	/** The lock that guarantees that record emission and state updates are atomic,
+	 * from the view of taking a checkpoint */
+	private final Object checkpointLock;
+
+	/** All partitions (and their state) that this fetcher is subscribed to */
+	private final KafkaTopicPartitionState<KPH>[] allPartitions;
+
+	/** The mode describing whether the fetcher also generates timestamps and watermarks */
+	private final int timestampWatermarkMode;
+	
+	/** Only relevant for punctuated watermarks: The current cross partition watermark */
+	private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
+
+	// ------------------------------------------------------------------------
+	
+	protected AbstractFetcher(
+			SourceContext<T> sourceContext,
+			List<KafkaTopicPartition> assignedPartitions,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			StreamingRuntimeContext runtimeContext) throws Exception
+	{
+		this.sourceContext = checkNotNull(sourceContext);
+		this.checkpointLock = sourceContext.getCheckpointLock();
+		
+		// figure out what we watermark mode we will be using
+		
+		if (watermarksPeriodic == null) {
+			if (watermarksPunctuated == null) {
+				// simple case, no watermarks involved
+				timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
+			} else {
+				timestampWatermarkMode = PUNCTUATED_WATERMARKS;
+			}
+		} else {
+			if (watermarksPunctuated == null) {
+				timestampWatermarkMode = PERIODIC_WATERMARKS;
+			} else {
+				throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
+			}
+		}
+		
+		// create our partition state according to the timestamp/watermark mode 
+		this.allPartitions = initializePartitions(
+				assignedPartitions,
+				timestampWatermarkMode,
+				watermarksPeriodic, watermarksPunctuated,
+				runtimeContext.getUserCodeClassLoader());
+		
+		// if we have periodic watermarks, kick off the interval scheduler
+		if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+			KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = 
+					(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
+			
+			PeriodicWatermarkEmitter periodicEmitter = 
+					new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext);
+			periodicEmitter.start();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets all partitions (with partition state) that this fetcher is subscribed to.
+	 *
+	 * @return All subscribed partitions.
+	 */
+	protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
+		return allPartitions;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Core fetcher work methods
+	// ------------------------------------------------------------------------
+
+	public abstract void runFetchLoop() throws Exception;
+	
+	public abstract void cancel();
+
+	// ------------------------------------------------------------------------
+	//  Kafka version specifics
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Creates the Kafka version specific representation of the given
+	 * topic partition.
+	 * 
+	 * @param partition The Flink representation of the Kafka topic partition.
+	 * @return The specific Kafka representation of the Kafka topic partition.
+	 */
+	public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);
+
+	/**
+	 * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
+	 * older Kafka versions).
+	 * 
+	 * @param offsets The offsets to commit to Kafka.
+	 * @throws Exception This method forwards exceptions.
+	 */
+	public abstract void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
+	
+	// ------------------------------------------------------------------------
+	//  snapshot and restore the state
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Takes a snapshot of the partition offsets.
+	 * 
+	 * <p>Important: This method mus be called under the checkpoint lock.
+	 * 
+	 * @return A map from partition to current offset.
+	 */
+	public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
+		// this method assumes that the checkpoint lock is held
+		assert Thread.holdsLock(checkpointLock);
+
+		HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
+		for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
+			if (partition.isOffsetDefined()) {
+				state.put(partition.getKafkaTopicPartition(), partition.getOffset());
+			}
+		}
+		return state;
+	}
+
+	/**
+	 * Restores the partition offsets.
+	 * 
+	 * @param snapshotState The offsets for the partitions 
+	 */
+	public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) {
+		for (KafkaTopicPartitionState<?> partition : allPartitions) {
+			Long offset = snapshotState.get(partition.getKafkaTopicPartition());
+			if (offset != null) {
+				partition.setOffset(offset);
+			}
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  emitting records
+	// ------------------------------------------------------------------------
+
+	/**
+	 * 
+	 * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
+	 * That makes the fast path efficient, the extended paths are called as separate methods.
+	 * 
+	 * @param record The record to emit
+	 * @param partitionState The state of the Kafka partition from which the record was fetched
+	 * @param offset The offset from which the record was fetched
+	 */
+	protected final void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) {
+		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+			// fast path logic, in case there are no watermarks
+
+			// emit the record, using the checkpoint lock to guarantee
+			// atomicity of record emission and offset state update
+			synchronized (checkpointLock) {
+				sourceContext.collect(record);
+				partitionState.setOffset(offset);
+			}
+		}
+		else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+			emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset);
+		}
+		else {
+			emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset);
+		}
+	}
+
+	/**
+	 * Record emission, if a timestamp will be attached from an assigner that is
+	 * also a periodic watermark generator.
+	 */
+	private void emitRecordWithTimestampAndPeriodicWatermark(
+			T record, KafkaTopicPartitionState<KPH> partitionState, long offset)
+	{
+		@SuppressWarnings("unchecked")
+		final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState =
+				(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
+
+		// extract timestamp - this accesses/modifies the per-partition state inside the
+		// watermark generator instance, so we need to lock the access on the
+		// partition state. concurrent access can happen from the periodic emitter
+		final long timestamp;
+		//noinspection SynchronizationOnLocalVariableOrMethodParameter
+		synchronized (withWatermarksState) {
+			timestamp = withWatermarksState.getTimestampForRecord(record);
+		}
+
+		// emit the record with timestamp, using the usual checkpoint lock to guarantee
+		// atomicity of record emission and offset state update 
+		synchronized (checkpointLock) {
+			sourceContext.collectWithTimestamp(record, timestamp);
+			partitionState.setOffset(offset);
+		}
+	}
+
+	/**
+	 * Record emission, if a timestamp will be attached from an assigner that is
+	 * also a punctuated watermark generator.
+	 */
+	private void emitRecordWithTimestampAndPunctuatedWatermark(
+			T record, KafkaTopicPartitionState<KPH> partitionState, long offset)
+	{
+		@SuppressWarnings("unchecked")
+		final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
+				(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
+
+		// only one thread ever works on accessing timestamps and watermarks
+		// from the punctuated extractor
+		final long timestamp = withWatermarksState.getTimestampForRecord(record);
+		final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
+			
+		// emit the record with timestamp, using the usual checkpoint lock to guarantee
+		// atomicity of record emission and offset state update 
+		synchronized (checkpointLock) {
+			sourceContext.collectWithTimestamp(record, timestamp);
+			partitionState.setOffset(offset);
+		}
+		
+		// if we also have a new per-partition watermark, check if that is also a
+		// new cross-partition watermark
+		if (newWatermark != null) {
+			updateMinPunctuatedWatermark(newWatermark);
+		}
+	}
+	/**
+	 *Checks whether a new per-partition watermark is also a new cross-partition watermark.
+	 */
+	private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
+		if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
+			long newMin = Long.MAX_VALUE;
+			
+			for (KafkaTopicPartitionState<?> state : allPartitions) {
+				@SuppressWarnings("unchecked")
+				final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
+						(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
+				
+				newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
+			}
+			
+			// double-check locking pattern
+			if (newMin > maxWatermarkSoFar) {
+				synchronized (checkpointLock) {
+					if (newMin > maxWatermarkSoFar) {
+						maxWatermarkSoFar = newMin;
+						sourceContext.emitWatermark(new Watermark(newMin));
+					}
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Utility method that takes the topic partitions and creates the topic partition state
+	 * holders. If a watermark generator per partition exists, this will also initialize those.
+	 */
+	private KafkaTopicPartitionState<KPH>[] initializePartitions(
+			List<KafkaTopicPartition> assignedPartitions,
+			int timestampWatermarkMode,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			ClassLoader userCodeClassLoader)
+		throws IOException, ClassNotFoundException
+	{
+		@SuppressWarnings("unchecked")
+		KafkaTopicPartitionState<KPH>[] partitions =
+				(KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
+
+		int pos = 0;
+		for (KafkaTopicPartition partition : assignedPartitions) {
+			// create the kafka version specific partition handle
+			KPH kafkaHandle = createKafkaPartitionHandle(partition);
+			
+			// create the partition state
+			KafkaTopicPartitionState<KPH> partitionState;
+			switch (timestampWatermarkMode) {
+				case NO_TIMESTAMPS_WATERMARKS:
+					partitionState = new KafkaTopicPartitionState<>(partition, kafkaHandle);
+					break;
+				case PERIODIC_WATERMARKS: {
+					AssignerWithPeriodicWatermarks<T> assignerInstance =
+							watermarksPeriodic.deserializeValue(userCodeClassLoader);
+					partitionState = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+							partition, kafkaHandle, assignerInstance);
+					break;
+				}
+					
+				case PUNCTUATED_WATERMARKS: {
+					AssignerWithPunctuatedWatermarks<T> assignerInstance =
+							watermarksPunctuated.deserializeValue(userCodeClassLoader);
+					partitionState = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+							partition, kafkaHandle, assignerInstance);
+					break;
+				}
+				default:
+					// cannot happen, add this as a guard for the future
+					throw new RuntimeException();
+			}
+
+			partitions[pos++] = partitionState;
+		}
+		
+		return partitions;
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * The periodic watermark emitter. In its given interval, it checks all partitions for
+	 * the current event time watermark, and possibly emits the next watermark.
+	 */
+	private static class PeriodicWatermarkEmitter implements Triggerable {
+
+		private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
+		
+		private final SourceContext<?> emitter;
+		
+		private final StreamingRuntimeContext triggerContext;
+
+		private final long interval;
+		
+		private long lastWatermarkTimestamp;
+		
+		//-------------------------------------------------
+
+		PeriodicWatermarkEmitter(
+				KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
+				SourceContext<?> emitter,
+				StreamingRuntimeContext runtimeContext)
+		{
+			this.allPartitions = checkNotNull(allPartitions);
+			this.emitter = checkNotNull(emitter);
+			this.triggerContext = checkNotNull(runtimeContext);
+			this.interval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
+			this.lastWatermarkTimestamp = Long.MIN_VALUE;
+		}
+
+		//-------------------------------------------------
+		
+		public void start() {
+			triggerContext.registerTimer(System.currentTimeMillis() + interval, this);
+		}
+		
+		@Override
+		public void trigger(long timestamp) throws Exception {
+			// sanity check
+			assert Thread.holdsLock(emitter.getCheckpointLock());
+			
+			long minAcrossAll = Long.MAX_VALUE;
+			for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
+				
+				// we access the current watermark for the periodic assigners under the state
+				// lock, to prevent concurrent modification to any internal variables
+				final long curr;
+				//noinspection SynchronizationOnLocalVariableOrMethodParameter
+				synchronized (state) {
+					curr = state.getCurrentWatermarkTimestamp();
+				}
+				
+				minAcrossAll = Math.min(minAcrossAll, curr);
+			}
+			
+			// emit next watermark, if there is one
+			if (minAcrossAll > lastWatermarkTimestamp) {
+				lastWatermarkTimestamp = minAcrossAll;
+				emitter.emitWatermark(new Watermark(minAcrossAll));
+			}
+			
+			// schedule the next watermark
+			triggerContext.registerTimer(System.currentTimeMillis() + interval, this);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
new file mode 100644
index 0000000..9a0e4e3
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * 
+ */
+public class ExceptionProxy {
+	
+	/** The thread that should be interrupted when an exception occurs */
+	private final Thread toInterrupt;
+	
+	/** The exception to throw */ 
+	private final AtomicReference<Throwable> exception;
+
+	/**
+	 * 
+	 * @param toInterrupt The thread to interrupt upon an exception. May be null.
+	 */
+	public ExceptionProxy(@Nullable Thread toInterrupt) {
+		this.toInterrupt = toInterrupt;
+		this.exception = new AtomicReference<>();
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Sets the exception occurred and interrupts the target thread,
+	 * if no other exception has occurred so far.
+	 * 
+	 * @param t The exception that occurred
+	 */
+	public void reportError(Throwable t) {
+		// set the exception, if it is the first
+		if (exception.compareAndSet(null, t) && toInterrupt != null) {
+			toInterrupt.interrupt();
+		}
+	}
+	
+	public void checkAndThrowException() throws Exception {
+		Throwable t = exception.get();
+		if (t != null) {
+			if (t instanceof Exception) {
+				throw (Exception) t;
+			}
+			else if (t instanceof Error) {
+				throw (Error) t;
+			}
+			else {
+				throw new Exception(t);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
deleted file mode 100644
index 11a392a..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.io.Serializable;
-
-public class KafkaPartitionState implements Serializable {
-
-	private static final long serialVersionUID = 722083576322742328L;
-
-	private final int partitionID;
-	private long offset;
-
-	private long maxTimestamp = Long.MIN_VALUE;
-	private boolean isActive = false;
-
-	public KafkaPartitionState(int id, long offset) {
-		this.partitionID = id;
-		this.offset = offset;
-	}
-
-	public void setOffset(long offset) {
-		this.offset = offset;
-	}
-
-	public void setActive(boolean isActive) {
-		this.isActive = isActive;
-	}
-
-	public void setMaxTimestamp(long timestamp) {
-		maxTimestamp = timestamp;
-	}
-
-	public int getPartition() {
-		return partitionID;
-	}
-
-	public boolean isActive() {
-		return isActive;
-	}
-
-	public long getMaxTimestamp() {
-		return maxTimestamp;
-	}
-
-	public long getOffset() {
-		return offset;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
index aea14cf..c68fe28 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -24,14 +24,20 @@ import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
-
 /**
- * A serializable representation of a kafka topic and a partition.
- * Used as an operator state for the Kafka consumer
+ * Flink's description of a partition in a Kafka topic.
+ * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, ...)
+ * 
+ * <p>Note: This class must not change in its structure, because it would change the
+ * serialization format and make previous savepoints unreadable.
  */
-public class KafkaTopicPartition implements Serializable {
+public final class KafkaTopicPartition implements Serializable {
 
+	/** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
+	 * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */
 	private static final long serialVersionUID = 722083576322742325L;
+	
+	// ------------------------------------------------------------------------
 
 	private final String topic;
 	private final int partition;
@@ -43,6 +49,8 @@ public class KafkaTopicPartition implements Serializable {
 		this.cachedHash = 31 * topic.hashCode() + partition;
 	}
 
+	// ------------------------------------------------------------------------
+	
 	public String getTopic() {
 		return topic;
 	}
@@ -51,6 +59,8 @@ public class KafkaTopicPartition implements Serializable {
 		return partition;
 	}
 
+	// ------------------------------------------------------------------------
+	
 	@Override
 	public String toString() {
 		return "KafkaTopicPartition{" +
@@ -64,25 +74,23 @@ public class KafkaTopicPartition implements Serializable {
 		if (this == o) {
 			return true;
 		}
-		if (!(o instanceof KafkaTopicPartition)) {
-			return false;
+		else if (o instanceof KafkaTopicPartition) {
+			KafkaTopicPartition that = (KafkaTopicPartition) o;
+			return this.partition == that.partition && this.topic.equals(that.topic);
 		}
-
-		KafkaTopicPartition that = (KafkaTopicPartition) o;
-
-		if (partition != that.partition) {
+		else {
 			return false;
 		}
-		return topic.equals(that.topic);
 	}
 
 	@Override
 	public int hashCode() {
 		return cachedHash;
 	}
-
-
-	// ------------------- Utilities -------------------------------------
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
 
 	public static String toString(Map<KafkaTopicPartition, Long> map) {
 		StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
new file mode 100644
index 0000000..36612a4
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * The state that the Flink Kafka Consumer holds for each Kafka partition.
+ * Includes the Kafka descriptor for partitions.
+ * 
+ * <p>This class describes the most basic state (only the offset), subclasses
+ * define more elaborate state, containing current watermarks and timestamp
+ * extractors.
+ * 
+ * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
+ */
+public class KafkaTopicPartitionState<KPH> {
+
+	/** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
+	 * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
+	public static final long OFFSET_NOT_SET = -915623761776L;
+	
+	// ------------------------------------------------------------------------
+
+	/** The Flink description of a Kafka partition */
+	private final KafkaTopicPartition partition;
+
+	/** The Kafka description of a Kafka partition (varies across different Kafka versions) */
+	private final KPH kafkaPartitionHandle;
+	
+	/** The offset within the Kafka partition that we already processed */
+	private volatile long offset;
+
+	// ------------------------------------------------------------------------
+	
+	public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
+		this.partition = partition;
+		this.kafkaPartitionHandle = kafkaPartitionHandle;
+		this.offset = OFFSET_NOT_SET;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets Flink's descriptor for the Kafka Partition.
+	 * @return The Flink partition descriptor.
+	 */
+	public final KafkaTopicPartition getKafkaTopicPartition() {
+		return partition;
+	}
+
+	/**
+	 * Gets Kafka's descriptor for the Kafka Partition.
+	 * @return The Kafka partition descriptor.
+	 */
+	public final KPH getKafkaPartitionHandle() {
+		return kafkaPartitionHandle;
+	}
+
+	public final String getTopic() {
+		return partition.getTopic();
+	}
+
+	public final int getPartition() {
+		return partition.getPartition();
+	}
+
+	/**
+	 * The current offset in the partition. This refers to the offset last element that
+	 * we retrieved and emitted successfully. It is the offset that should be stored in
+	 * a checkpoint.
+	 */
+	public final long getOffset() {
+		return offset;
+	}
+
+	public final void setOffset(long offset) {
+		this.offset = offset;
+	}
+	
+	public final boolean isOffsetDefined() {
+		return offset != OFFSET_NOT_SET;
+	}
+	
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "Partition: " + partition + ", KafkaPartitionHandle=" + kafkaPartitionHandle
+				+ ", offset=" + (isOffsetDefined() ? String.valueOf(offset) : "(not set)");
+	}
+}