You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/01 07:35:56 UTC

[05/14] flink git commit: [FLINK-4022] [kafka] Partition / topic discovery for FlinkKafkaConsumer

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index cfd7c3b..3bed0b8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -31,6 +31,8 @@ import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -62,7 +64,18 @@ public abstract class AbstractFetcher<T, KPH> {
 	protected final Object checkpointLock;
 
 	/** All partitions (and their state) that this fetcher is subscribed to. */
-	private final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates;
+	private final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates;
+
+	/**
+	 * Queue of partitions that are not yet assigned to any Kafka clients for consuming.
+	 * Kafka version-specific implementations of {@link AbstractFetcher#runFetchLoop()}
+	 * should continuously poll this queue for unassigned partitions, and start consuming
+	 * them accordingly.
+	 *
+	 * <p>All partitions added to this queue are guaranteed to have been added
+	 * to {@link #subscribedPartitionStates} already.
+	 */
+	protected final ClosableBlockingQueue<KafkaTopicPartitionState<KPH>> unassignedPartitionsQueue;
 
 	/** The mode describing whether the fetcher also generates timestamps and watermarks. */
 	protected final int timestampWatermarkMode;
@@ -70,6 +83,23 @@ public abstract class AbstractFetcher<T, KPH> {
 	/** Flag whether to register metrics for the fetcher. */
 	protected final boolean useMetrics;
 
+	/**
+	 * Optional timestamp extractor / watermark generator that will be run per Kafka partition,
+	 * to exploit per-partition timestamp characteristics.
+	 * The assigner is kept in serialized form, to deserialize it into multiple copies.
+	 */
+	private final SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic;
+
+	/**
+	 * Optional timestamp extractor / watermark generator that will be run per Kafka partition,
+	 * to exploit per-partition timestamp characteristics.
+	 * The assigner is kept in serialized form, to deserialize it into multiple copies.
+	 */
+	private final SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated;
+
+	/** User class loader used to deserialize watermark assigners. */
+	private final ClassLoader userCodeClassLoader;
+
 	/** Only relevant for punctuated watermarks: The current cross partition watermark. */
 	private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
 
@@ -77,7 +107,7 @@ public abstract class AbstractFetcher<T, KPH> {
 
 	protected AbstractFetcher(
 			SourceContext<T> sourceContext,
-			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+			Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			ProcessingTimeService processingTimeProvider,
@@ -87,8 +117,11 @@ public abstract class AbstractFetcher<T, KPH> {
 		this.sourceContext = checkNotNull(sourceContext);
 		this.checkpointLock = sourceContext.getCheckpointLock();
 		this.useMetrics = useMetrics;
+		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
 
 		// figure out what we watermark mode we will be using
+		this.watermarksPeriodic = watermarksPeriodic;
+		this.watermarksPunctuated = watermarksPunctuated;
 
 		if (watermarksPeriodic == null) {
 			if (watermarksPunctuated == null) {
@@ -105,31 +138,69 @@ public abstract class AbstractFetcher<T, KPH> {
 			}
 		}
 
-		// create our partition state according to the timestamp/watermark mode
-		this.subscribedPartitionStates = initializeSubscribedPartitionStates(
-				assignedPartitionsWithInitialOffsets,
+		this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
+
+		// initialize subscribed partition states with seed partitions
+		this.subscribedPartitionStates = createPartitionStateHolders(
+				seedPartitionsWithInitialOffsets,
 				timestampWatermarkMode,
-				watermarksPeriodic, watermarksPunctuated,
+				watermarksPeriodic,
+				watermarksPunctuated,
 				userCodeClassLoader);
 
-		// check that all partition states have a defined offset
+		// check that all seed partition states have a defined offset
 		for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
 			if (!partitionState.isOffsetDefined()) {
-				throw new IllegalArgumentException("The fetcher was assigned partitions with undefined initial offsets.");
+				throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
 			}
 		}
 
+		// all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
+		for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) {
+			unassignedPartitionsQueue.add(partition);
+		}
+
 		// if we have periodic watermarks, kick off the interval scheduler
 		if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-			KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts =
-					(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) subscribedPartitionStates;
+			@SuppressWarnings("unchecked")
+			PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(
+					subscribedPartitionStates,
+					sourceContext,
+					processingTimeProvider,
+					autoWatermarkInterval);
 
-			PeriodicWatermarkEmitter periodicEmitter =
-					new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
 			periodicEmitter.start();
 		}
 	}
 
+	/**
+	 * Adds a list of newly discovered partitions to the fetcher for consuming.
+	 *
+	 * <p>This method creates the partition state holder for each new partition, using
+	 * {@link KafkaTopicPartitionStateSentinel#EARLIEST_OFFSET} as the starting offset.
+	 * It uses the earliest offset because there may be delay in discovering a partition
+	 * after it was created and started receiving records.
+	 *
+	 * <p>After the state representation for a partition is created, it is added to the
+	 * unassigned partitions queue to await to be consumed.
+	 *
+	 * @param newPartitions discovered partitions to add
+	 */
+	public void addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions) throws IOException, ClassNotFoundException {
+		List<KafkaTopicPartitionState<KPH>> newPartitionStates = createPartitionStateHolders(
+				newPartitions,
+				KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET,
+				timestampWatermarkMode,
+				watermarksPeriodic,
+				watermarksPunctuated,
+				userCodeClassLoader);
+
+		for (KafkaTopicPartitionState<KPH> newPartitionState : newPartitionStates) {
+			subscribedPartitionStates.add(newPartitionState);
+			unassignedPartitionsQueue.add(newPartitionState);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Properties
 	// ------------------------------------------------------------------------
@@ -139,7 +210,7 @@ public abstract class AbstractFetcher<T, KPH> {
 	 *
 	 * @return All subscribed partitions.
 	 */
-	protected final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates() {
+	protected final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates() {
 		return subscribedPartitionStates;
 	}
 
@@ -156,15 +227,6 @@ public abstract class AbstractFetcher<T, KPH> {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates the Kafka version specific representation of the given
-	 * topic partition.
-	 *
-	 * @param partition The Flink representation of the Kafka topic partition.
-	 * @return The specific Kafka representation of the Kafka topic partition.
-	 */
-	public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);
-
-	/**
 	 * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
 	 * older Kafka versions). This method is only ever called when the offset commit mode of
 	 * the consumer is {@link OffsetCommitMode#ON_CHECKPOINTS}.
@@ -179,6 +241,15 @@ public abstract class AbstractFetcher<T, KPH> {
 	 */
 	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
 
+	/**
+	 * Creates the Kafka version specific representation of the given
+	 * topic partition.
+	 *
+	 * @param partition The Flink representation of the Kafka topic partition.
+	 * @return The version-specific Kafka representation of the Kafka topic partition.
+	 */
+	protected abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);
+
 	// ------------------------------------------------------------------------
 	//  snapshot and restore the state
 	// ------------------------------------------------------------------------
@@ -186,7 +257,7 @@ public abstract class AbstractFetcher<T, KPH> {
 	/**
 	 * Takes a snapshot of the partition offsets.
 	 *
-	 * <p>Important: This method mus be called under the checkpoint lock.
+	 * <p>Important: This method must be called under the checkpoint lock.
 	 *
 	 * @return A map from partition to current offset.
 	 */
@@ -194,8 +265,8 @@ public abstract class AbstractFetcher<T, KPH> {
 		// this method assumes that the checkpoint lock is held
 		assert Thread.holdsLock(checkpointLock);
 
-		HashMap<KafkaTopicPartition, Long> state = new HashMap<>(subscribedPartitionStates.length);
-		for (KafkaTopicPartitionState<?> partition : subscribedPartitionStates()) {
+		HashMap<KafkaTopicPartition, Long> state = new HashMap<>(subscribedPartitionStates.size());
+		for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) {
 			state.put(partition.getKafkaTopicPartition(), partition.getOffset());
 		}
 		return state;
@@ -365,78 +436,73 @@ public abstract class AbstractFetcher<T, KPH> {
 
 	/**
 	 * Utility method that takes the topic partitions and creates the topic partition state
-	 * holders. If a watermark generator per partition exists, this will also initialize those.
+	 * holders, depending on the timestamp / watermark mode.
 	 */
-	private KafkaTopicPartitionState<KPH>[] initializeSubscribedPartitionStates(
-			Map<KafkaTopicPartition, Long> assignedPartitionsToInitialOffsets,
+	private List<KafkaTopicPartitionState<KPH>> createPartitionStateHolders(
+			Map<KafkaTopicPartition, Long> partitionsToInitialOffsets,
 			int timestampWatermarkMode,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
-		switch (timestampWatermarkMode) {
 
-			case NO_TIMESTAMPS_WATERMARKS: {
-				@SuppressWarnings("unchecked")
-				KafkaTopicPartitionState<KPH>[] partitions =
-						(KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitionsToInitialOffsets.size()];
+		List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
 
-				int pos = 0;
-				for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
+		switch (timestampWatermarkMode) {
+			case NO_TIMESTAMPS_WATERMARKS: {
+				for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
 					// create the kafka version specific partition handle
-					KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey());
-					partitions[pos] = new KafkaTopicPartitionState<>(partition.getKey(), kafkaHandle);
-					partitions[pos].setOffset(partition.getValue());
+					KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
+
+					KafkaTopicPartitionState<KPH> partitionState =
+							new KafkaTopicPartitionState<>(partitionEntry.getKey(), kafkaHandle);
+					partitionState.setOffset(partitionEntry.getValue());
 
-					pos++;
+					partitionStates.add(partitionState);
 				}
 
-				return partitions;
+				return partitionStates;
 			}
 
 			case PERIODIC_WATERMARKS: {
-				@SuppressWarnings("unchecked")
-				KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
-						(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
-								new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()];
-
-				int pos = 0;
-				for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
-					KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey());
+				for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
+					KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
 
 					AssignerWithPeriodicWatermarks<T> assignerInstance =
 							watermarksPeriodic.deserializeValue(userCodeClassLoader);
 
-					partitions[pos] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
-							partition.getKey(), kafkaHandle, assignerInstance);
-					partitions[pos].setOffset(partition.getValue());
+					KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partitionState =
+							new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+									partitionEntry.getKey(),
+									kafkaHandle,
+									assignerInstance);
 
-					pos++;
+					partitionState.setOffset(partitionEntry.getValue());
+
+					partitionStates.add(partitionState);
 				}
 
-				return partitions;
+				return partitionStates;
 			}
 
 			case PUNCTUATED_WATERMARKS: {
-				@SuppressWarnings("unchecked")
-				KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
-						(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
-								new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()];
-
-				int pos = 0;
-				for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
-					KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey());
+				for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
+					KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
 
 					AssignerWithPunctuatedWatermarks<T> assignerInstance =
 							watermarksPunctuated.deserializeValue(userCodeClassLoader);
 
-					partitions[pos] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
-							partition.getKey(), kafkaHandle, assignerInstance);
-					partitions[pos].setOffset(partition.getValue());
+					KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> partitionState =
+							new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+									partitionEntry.getKey(),
+									kafkaHandle,
+									assignerInstance);
+
+					partitionState.setOffset(partitionEntry.getValue());
 
-					pos++;
+					partitionStates.add(partitionState);
 				}
 
-				return partitions;
+				return partitionStates;
 			}
 			default:
 				// cannot happen, add this as a guard for the future
@@ -444,6 +510,31 @@ public abstract class AbstractFetcher<T, KPH> {
 		}
 	}
 
+	/**
+	 * Shortcut variant of {@link #createPartitionStateHolders(Map, int, SerializedValue, SerializedValue, ClassLoader)}
+	 * that uses the same offset for all partitions when creating their state holders.
+	 */
+	private List<KafkaTopicPartitionState<KPH>> createPartitionStateHolders(
+		List<KafkaTopicPartition> partitions,
+		long initialOffset,
+		int timestampWatermarkMode,
+		SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+		SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+		ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
+
+		Map<KafkaTopicPartition, Long> partitionsToInitialOffset = new HashMap<>(partitions.size());
+		for (KafkaTopicPartition partition : partitions) {
+			partitionsToInitialOffset.put(partition, initialOffset);
+		}
+
+		return createPartitionStateHolders(
+				partitionsToInitialOffset,
+				timestampWatermarkMode,
+				watermarksPeriodic,
+				watermarksPunctuated,
+				userCodeClassLoader);
+	}
+
 	// ------------------------- Metrics ----------------------------------
 
 	/**
@@ -455,7 +546,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		// add current offsets to gage
 		MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
 		MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
-		for (KafkaTopicPartitionState<?> ktp: subscribedPartitionStates()) {
+		for (KafkaTopicPartitionState<KPH> ktp : subscribedPartitionStates) {
 			currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
 			committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
 		}
@@ -500,9 +591,9 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * The periodic watermark emitter. In its given interval, it checks all partitions for
 	 * the current event time watermark, and possibly emits the next watermark.
 	 */
-	private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
+	private static class PeriodicWatermarkEmitter<KPH> implements ProcessingTimeCallback {
 
-		private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
+		private final List<KafkaTopicPartitionState<KPH>> allPartitions;
 
 		private final SourceContext<?> emitter;
 
@@ -515,7 +606,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		//-------------------------------------------------
 
 		PeriodicWatermarkEmitter(
-				KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
+				List<KafkaTopicPartitionState<KPH>> allPartitions,
 				SourceContext<?> emitter,
 				ProcessingTimeService timerService,
 				long autoWatermarkInterval) {
@@ -536,14 +627,14 @@ public abstract class AbstractFetcher<T, KPH> {
 		public void onProcessingTime(long timestamp) throws Exception {
 
 			long minAcrossAll = Long.MAX_VALUE;
-			for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
+			for (KafkaTopicPartitionState<?> state : allPartitions) {
 
 				// we access the current watermark for the periodic assigners under the state
 				// lock, to prevent concurrent modification to any internal variables
 				final long curr;
 				//noinspection SynchronizationOnLocalVariableOrMethodParameter
 				synchronized (state) {
-					curr = state.getCurrentWatermarkTimestamp();
+					curr = ((KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>) state).getCurrentWatermarkTimestamp();
 				}
 
 				minAcrossAll = Math.min(minAcrossAll, curr);

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
new file mode 100644
index 0000000..d55099a
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all partition discoverers.
+ *
+ * <p>This partition discoverer base class implements the logic around bookkeeping
+ * discovered partitions, and using the information to determine whether or not there
+ * are new partitions that the consumer subtask should subscribe to.
+ *
+ * <p>Subclass implementations should simply implement the logic of using the version-specific
+ * Kafka clients to fetch topic and partition metadata.
+ *
+ * <p>Since Kafka clients are generally not thread-safe, partition discoverers should
+ * not be concurrently accessed. The only exception for this would be the {@link #wakeup()}
+ * call, which allows the discoverer to be interrupted during a {@link #discoverPartitions()} call.
+ */
+public abstract class AbstractPartitionDiscoverer {
+
+	/** Describes whether we are discovering partitions for fixed topics or a topic pattern. */
+	private final KafkaTopicsDescriptor topicsDescriptor;
+
+	/** Index of the consumer subtask that this partition discoverer belongs to. */
+	private final int indexOfThisSubtask;
+
+	/** The total number of consumer subtasks. */
+	private final int numParallelSubtasks;
+
+	/** Flag to determine whether or not the discoverer is closed. */
+	private volatile boolean closed = true;
+
+	/**
+	 * Flag to determine whether or not the discoverer had been woken up.
+	 * When set to {@code true}, {@link #discoverPartitions()} would be interrupted as early as possible.
+	 * Once interrupted, the flag is reset.
+	 */
+	private volatile boolean wakeup;
+
+	/**
+	 * Map of topics to they're largest discovered partition id seen by this subtask.
+	 * This state may be updated whenever {@link AbstractPartitionDiscoverer#discoverPartitions()} or
+	 * {@link AbstractPartitionDiscoverer#setAndCheckDiscoveredPartition(KafkaTopicPartition)} is called.
+	 *
+	 * <p>This is used to remove old partitions from the fetched partition lists. It is sufficient
+	 * to keep track of only the largest partition id because Kafka partition numbers are only
+	 * allowed to be increased and has incremental ids.
+	 */
+	private Map<String, Integer> topicsToLargestDiscoveredPartitionId;
+
+	public AbstractPartitionDiscoverer(
+			KafkaTopicsDescriptor topicsDescriptor,
+			int indexOfThisSubtask,
+			int numParallelSubtasks) {
+
+		this.topicsDescriptor = checkNotNull(topicsDescriptor);
+		this.indexOfThisSubtask = indexOfThisSubtask;
+		this.numParallelSubtasks = numParallelSubtasks;
+		this.topicsToLargestDiscoveredPartitionId = new HashMap<>();
+	}
+
+	/**
+	 * Opens the partition discoverer, initializing all required Kafka connections.
+	 *
+	 * <p>NOTE: thread-safety is not guaranteed.
+	 */
+	public void open() throws Exception {
+		closed = false;
+		initializeConnections();
+	}
+
+	/**
+	 * Closes the partition discoverer, cleaning up all Kafka connections.
+	 *
+	 * <p>NOTE: thread-safety is not guaranteed.
+	 */
+	public void close() throws Exception {
+		closed = true;
+		closeConnections();
+	}
+
+	/**
+	 * Interrupt an in-progress discovery attempt by throwing a {@link WakeupException}.
+	 * If no attempt is in progress, the immediate next attempt will throw a {@link WakeupException}.
+	 *
+	 * <p>This method can be called concurrently from a different thread.
+	 */
+	public void wakeup() {
+		wakeup = true;
+		wakeupConnections();
+	}
+
+	/**
+	 * Execute a partition discovery attempt for this subtask.
+	 * This method lets the partition discoverer update what partitions it has discovered so far.
+	 *
+	 * @return List of discovered new partitions that this subtask should subscribe to.
+	 */
+	public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
+		if (!closed && !wakeup) {
+			try {
+				List<KafkaTopicPartition> newDiscoveredPartitions;
+
+				// (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern
+				if (topicsDescriptor.isFixedTopics()) {
+					newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
+				} else {
+					List<String> matchedTopics = getAllTopics();
+
+					// retain topics that match the pattern
+					Iterator<String> iter = matchedTopics.iterator();
+					while (iter.hasNext()) {
+						if (!topicsDescriptor.getTopicPattern().matcher(iter.next()).matches()) {
+							iter.remove();
+						}
+					}
+
+					if (matchedTopics.size() != 0) {
+						// get partitions only for matched topics
+						newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
+					} else {
+						newDiscoveredPartitions = null;
+					}
+				}
+
+				// (2) eliminate partition that are old partitions or should not be subscribed by this subtask
+				if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
+					throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
+				} else {
+					// sort so that we make sure the topicsToLargestDiscoveredPartitionId state is updated
+					// with incremental partition ids of the same topics (otherwise some partition ids may be skipped)
+					KafkaTopicPartition.sort(newDiscoveredPartitions);
+
+					Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
+					KafkaTopicPartition nextPartition;
+					while (iter.hasNext()) {
+						nextPartition = iter.next();
+						if (!setAndCheckDiscoveredPartition(nextPartition)) {
+							iter.remove();
+						}
+					}
+				}
+
+				return newDiscoveredPartitions;
+			} catch (WakeupException e) {
+				// the actual topic / partition metadata fetching methods
+				// may be woken up midway; reset the wakeup flag and rethrow
+				wakeup = false;
+				throw e;
+			}
+		} else if (!closed && wakeup) {
+			// may have been woken up before the method call
+			wakeup = false;
+			throw new WakeupException();
+		} else {
+			throw new ClosedException();
+		}
+	}
+
+	/**
+	 * Sets a partition as discovered. Partitions are considered as new
+	 * if its partition id is larger than all partition ids previously
+	 * seen for the topic it belongs to. Therefore, for a set of
+	 * discovered partitions, the order that this method is invoked with
+	 * each partition is important.
+	 *
+	 * <p>If the partition is indeed newly discovered, this method also returns
+	 * whether the new partition should be subscribed by this subtask.
+	 *
+	 * @param partition the partition to set and check
+	 *
+	 * @return {@code true}, if the partition wasn't seen before and should
+	 *         be subscribed by this subtask; {@code false} otherwise
+	 */
+	public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
+		if (isUndiscoveredPartition(partition)) {
+				topicsToLargestDiscoveredPartitionId.put(partition.getTopic(), partition.getPartition());
+
+			return shouldAssignToThisSubtask(partition, indexOfThisSubtask, numParallelSubtasks);
+		}
+
+		return false;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Kafka version specifics
+	// ------------------------------------------------------------------------
+
+	/** Establish the required connections in order to fetch topics and partitions metadata. */
+	protected abstract void initializeConnections() throws Exception;
+
+	/**
+	 * Attempt to eagerly wakeup from blocking calls to Kafka in {@link AbstractPartitionDiscoverer#getAllTopics()}
+	 * and {@link AbstractPartitionDiscoverer#getAllPartitionsForTopics(List)}.
+	 *
+	 * <p>If the invocation indeed results in interrupting an actual blocking Kafka call, the implementations
+	 * of {@link AbstractPartitionDiscoverer#getAllTopics()} and
+	 * {@link AbstractPartitionDiscoverer#getAllPartitionsForTopics(List)} are responsible of throwing a
+	 * {@link WakeupException}.
+	 */
+	protected abstract void wakeupConnections();
+
+	/** Close all established connections. */
+	protected abstract void closeConnections() throws Exception;
+
+	/** Fetch the list of all topics from Kafka. */
+	protected abstract List<String> getAllTopics() throws WakeupException;
+
+	/** Fetch the list of all partitions for a specific topics list from Kafka. */
+	protected abstract List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws WakeupException;
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/** Signaling exception to indicate that an actual Kafka call was interrupted. */
+	public static final class WakeupException extends Exception {
+		private static final long serialVersionUID = 1L;
+	}
+
+	/** Thrown if this discoverer was used to discover partitions after it was closed. */
+	public static final class ClosedException extends Exception {
+		private static final long serialVersionUID = 1L;
+	}
+
+	private boolean isUndiscoveredPartition(KafkaTopicPartition partition) {
+		return !topicsToLargestDiscoveredPartitionId.containsKey(partition.getTopic())
+			||  partition.getPartition() > topicsToLargestDiscoveredPartitionId.get(partition.getTopic());
+	}
+
+	public static boolean shouldAssignToThisSubtask(KafkaTopicPartition partition, int indexOfThisSubtask, int numParallelSubtasks) {
+		return Math.abs(partition.hashCode() % numParallelSubtasks) == indexOfThisSubtask;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
new file mode 100644
index 0000000..da61dd0
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A special form of blocking queue with two additions:
+ * <ol>
+ *     <li>The queue can be closed atomically when empty. Adding elements after the queue
+ *         is closed fails. This allows queue consumers to atomically discover that no elements
+ *         are available and mark themselves as shut down.</li>
+ *     <li>The queue allows to poll batches of elements in one polling call.</li>
+ * </ol>
+ *
+ * <p>The queue has no capacity restriction and is safe for multiple producers and consumers.
+ *
+ * <p>Note: Null elements are prohibited.
+ *
+ * @param <E> The type of elements in the queue.
+ */
+public class ClosableBlockingQueue<E> {
+
+	/** The lock used to make queue accesses and open checks atomic. */
+	private final ReentrantLock lock;
+
+	/** The condition on which blocking get-calls wait if the queue is empty. */
+	private final Condition nonEmpty;
+
+	/** The deque of elements. */
+	private final ArrayDeque<E> elements;
+
+	/** Flag marking the status of the queue. */
+	private volatile boolean open;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new empty queue.
+	 */
+	public ClosableBlockingQueue() {
+		this(10);
+	}
+
+	/**
+	 * Creates a new empty queue, reserving space for at least the specified number
+	 * of elements. The queu can still grow, of more elements are added than the
+	 * reserved space.
+	 *
+	 * @param initialSize The number of elements to reserve space for.
+	 */
+	public ClosableBlockingQueue(int initialSize) {
+		this.lock = new ReentrantLock(true);
+		this.nonEmpty = this.lock.newCondition();
+
+		this.elements = new ArrayDeque<>(initialSize);
+		this.open = true;
+
+	}
+
+	/**
+	 * Creates a new queue that contains the given elements.
+	 *
+	 * @param initialElements The elements to initially add to the queue.
+	 */
+	public ClosableBlockingQueue(Collection<? extends E> initialElements) {
+		this(initialElements.size());
+		this.elements.addAll(initialElements);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Size and status
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the number of elements currently in the queue.
+	 * @return The number of elements currently in the queue.
+	 */
+	public int size() {
+		lock.lock();
+		try {
+			return elements.size();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Checks whether the queue is empty (has no elements).
+	 * @return True, if the queue is empty; false, if it is non-empty.
+	 */
+	public boolean isEmpty() {
+		return size() == 0;
+	}
+
+	/**
+	 * Checks whether the queue is currently open, meaning elements can be added and polled.
+	 * @return True, if the queue is open; false, if it is closed.
+	 */
+	public boolean isOpen() {
+		return open;
+	}
+
+	/**
+	 * Tries to close the queue. Closing the queue only succeeds when no elements are
+	 * in the queue when this method is called. Checking whether the queue is empty, and
+	 * marking the queue as closed is one atomic operation.
+	 *
+	 * @return True, if the queue is closed, false if the queue remains open.
+	 */
+	public boolean close() {
+		lock.lock();
+		try {
+			if (open) {
+				if (elements.isEmpty()) {
+					open = false;
+					nonEmpty.signalAll();
+					return true;
+				} else {
+					return false;
+				}
+			}
+			else {
+				// already closed
+				return true;
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Adding / Removing elements
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Tries to add an element to the queue, if the queue is still open. Checking whether the queue
+	 * is open and adding the element is one atomic operation.
+	 *
+	 * <p>Unlike the {@link #add(Object)} method, this method never throws an exception,
+	 * but only indicates via the return code if the element was added or the
+	 * queue was closed.
+	 *
+	 * @param element The element to add.
+	 * @return True, if the element was added, false if the queue was closes.
+	 */
+	public boolean addIfOpen(E element) {
+		requireNonNull(element);
+
+		lock.lock();
+		try {
+			if (open) {
+				elements.addLast(element);
+				if (elements.size() == 1) {
+					nonEmpty.signalAll();
+				}
+			}
+			return open;
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Adds the element to the queue, or fails with an exception, if the queue is closed.
+	 * Checking whether the queue is open and adding the element is one atomic operation.
+	 *
+	 * @param element The element to add.
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 */
+	public void add(E element) throws IllegalStateException {
+		requireNonNull(element);
+
+		lock.lock();
+		try {
+			if (open) {
+				elements.addLast(element);
+				if (elements.size() == 1) {
+					nonEmpty.signalAll();
+				}
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns the queue's next element without removing it, if the queue is non-empty.
+	 * Otherwise, returns null.
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and getting the next element is one atomic operation.
+	 *
+	 * <p>This method never blocks.
+	 *
+	 * @return The queue's next element, or null, if the queue is empty.
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 */
+	public E peek() {
+		lock.lock();
+		try {
+			if (open) {
+				if (elements.size() > 0) {
+					return elements.getFirst();
+				} else {
+					return null;
+				}
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns the queue's next element and removes it, the queue is non-empty.
+	 * Otherwise, this method returns null.
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 *
+	 * <p>This method never blocks.
+	 *
+	 * @return The queue's next element, or null, if the queue is empty.
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 */
+	public E poll() {
+		lock.lock();
+		try {
+			if (open) {
+				if (elements.size() > 0) {
+					return elements.removeFirst();
+				} else {
+					return null;
+				}
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns all of the queue's current elements in a list, if the queue is non-empty.
+	 * Otherwise, this method returns null.
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the elements is one atomic operation.
+	 *
+	 * <p>This method never blocks.
+	 *
+	 * @return All of the queue's elements, or null, if the queue is empty.
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 */
+	public List<E> pollBatch() {
+		lock.lock();
+		try {
+			if (open) {
+				if (elements.size() > 0) {
+					ArrayList<E> result = new ArrayList<>(elements);
+					elements.clear();
+					return result;
+				} else {
+					return null;
+				}
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns the next element in the queue. If the queue is empty, this method
+	 * waits until at least one element is added.
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 *
+	 * @return The next element in the queue, never null.
+	 *
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+	 *                              element to be added.
+	 */
+	public E getElementBlocking() throws InterruptedException {
+		lock.lock();
+		try {
+			while (open && elements.isEmpty()) {
+				nonEmpty.await();
+			}
+
+			if (open) {
+				return elements.removeFirst();
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns the next element in the queue. If the queue is empty, this method
+	 * waits at most a certain time until an element becomes available. If no element
+	 * is available after that time, the method returns null.
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 *
+	 * @param timeoutMillis The number of milliseconds to block, at most.
+	 * @return The next element in the queue, or null, if the timeout expires  before an element is available.
+	 *
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+	 *                              element to be added.
+	 */
+	public E getElementBlocking(long timeoutMillis) throws InterruptedException {
+		if (timeoutMillis == 0L) {
+			// wait forever case
+			return getElementBlocking();
+		} else if (timeoutMillis < 0L) {
+			throw new IllegalArgumentException("invalid timeout");
+		}
+
+		final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
+
+		lock.lock();
+		try {
+			while (open && elements.isEmpty() && timeoutMillis > 0) {
+				nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
+				timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
+			}
+
+			if (!open) {
+				throw new IllegalStateException("queue is closed");
+			}
+			else if (elements.isEmpty()) {
+				return null;
+			} else {
+				return elements.removeFirst();
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Gets all the elements found in the list, or blocks until at least one element
+	 * was added. If the queue is empty when this method is called, it blocks until
+	 * at least one element is added.
+	 *
+	 * <p>This method always returns a list with at least one element.
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 *
+	 * @return A list with all elements in the queue, always at least one element.
+	 *
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+	 *                              element to be added.
+	 */
+	public List<E> getBatchBlocking() throws InterruptedException {
+		lock.lock();
+		try {
+			while (open && elements.isEmpty()) {
+				nonEmpty.await();
+			}
+			if (open) {
+				ArrayList<E> result = new ArrayList<>(elements);
+				elements.clear();
+				return result;
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Gets all the elements found in the list, or blocks until at least one element
+	 * was added. This method is similar as {@link #getBatchBlocking()}, but takes
+	 * a number of milliseconds that the method will maximally wait before returning.
+	 *
+	 * <p>This method never returns null, but an empty list, if the queue is empty when
+	 * the method is called and the request times out before an element was added.
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 *
+	 * @param timeoutMillis The number of milliseconds to wait, at most.
+	 * @return A list with all elements in the queue, possible an empty list.
+	 *
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+	 *                              element to be added.
+	 */
+	public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException {
+		if (timeoutMillis == 0L) {
+			// wait forever case
+			return getBatchBlocking();
+		} else if (timeoutMillis < 0L) {
+			throw new IllegalArgumentException("invalid timeout");
+		}
+
+		final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
+
+		lock.lock();
+		try {
+			while (open && elements.isEmpty() && timeoutMillis > 0) {
+				nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
+				timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
+			}
+
+			if (!open) {
+				throw new IllegalStateException("queue is closed");
+			}
+			else if (elements.isEmpty()) {
+				return Collections.emptyList();
+			}
+			else {
+				ArrayList<E> result = new ArrayList<>(elements);
+				elements.clear();
+				return result;
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Standard Utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		int hashCode = 17;
+		for (E element : elements) {
+			hashCode = 31 * hashCode + element.hashCode();
+		}
+		return hashCode;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		} else if (obj != null && obj.getClass() == ClosableBlockingQueue.class) {
+			@SuppressWarnings("unchecked")
+			ClosableBlockingQueue<E> that = (ClosableBlockingQueue<E>) obj;
+
+			if (this.elements.size() == that.elements.size()) {
+				Iterator<E> thisElements = this.elements.iterator();
+				for (E thatNext : that.elements) {
+					E thisNext = thisElements.next();
+					if (!(thisNext == null ? thatNext == null : thisNext.equals(thatNext))) {
+						return false;
+					}
+				}
+				return true;
+			} else {
+				return false;
+			}
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return elements.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
index f3645e3..3500cd8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka.internals;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -116,4 +117,22 @@ public final class KafkaTopicPartition implements Serializable {
 		}
 		return ret;
 	}
+
+	/**
+	 * A {@link java.util.Comparator} for {@link KafkaTopicPartition}s.
+	 */
+	public static class Comparator implements java.util.Comparator<KafkaTopicPartition> {
+		@Override
+		public int compare(KafkaTopicPartition p1, KafkaTopicPartition p2) {
+			if (!p1.getTopic().equals(p2.getTopic())) {
+				return p1.getTopic().compareTo(p2.getTopic());
+			} else {
+				return Integer.compare(p1.getPartition(), p2.getPartition());
+			}
+		}
+	}
+
+	public static void sort(List<KafkaTopicPartition> partitions) {
+		Collections.sort(partitions, new Comparator());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
new file mode 100644
index 0000000..9a81ea8
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A Kafka Topics Descriptor describes how the consumer subscribes to Kafka topics -
+ * either a fixed list of topics, or a topic pattern.
+ */
+public class KafkaTopicsDescriptor implements Serializable {
+
+	private static final long serialVersionUID = -3807227764764900975L;
+
+	private final List<String> fixedTopics;
+	private final Pattern topicPattern;
+
+	public KafkaTopicsDescriptor(@Nullable List<String> fixedTopics, @Nullable Pattern topicPattern) {
+		checkArgument((fixedTopics != null && topicPattern == null) || (fixedTopics == null && topicPattern != null),
+			"Exactly one of either fixedTopics or topicPattern must be specified.");
+
+		if (fixedTopics != null) {
+			checkArgument(!fixedTopics.isEmpty(), "If subscribing to a fixed topics list, the supplied list cannot be empty.");
+		}
+
+		this.fixedTopics = fixedTopics;
+		this.topicPattern = topicPattern;
+	}
+
+	public boolean isFixedTopics() {
+		return fixedTopics != null;
+	}
+
+	public boolean isTopicPattern() {
+		return topicPattern != null;
+	}
+
+	public List<String> getFixedTopics() {
+		return fixedTopics;
+	}
+
+	public Pattern getTopicPattern() {
+		return topicPattern;
+	}
+
+	@Override
+	public String toString() {
+		return (fixedTopics == null)
+			? "Topic Regex Pattern (" + topicPattern.pattern() + ")"
+			: "Fixed Topics (" + fixedTopics + ")";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 692dc1d..e3f337e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -28,8 +28,10 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
@@ -54,10 +56,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doAnswer;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 /**
  * Tests for checking whether {@link FlinkKafkaConsumerBase} can restore from snapshots that were
@@ -122,7 +128,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 
 		final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());
 
-		final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(fetcher, partitions);
+		final DummyFlinkKafkaConsumer<String> consumerFunction =
+			new DummyFlinkKafkaConsumer<>(fetcher, partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
 
 		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
 				new StreamSource<>(consumerFunction);
@@ -178,7 +185,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 	@Test
 	public void testRestoreFromEmptyStateNoPartitions() throws Exception {
 		final DummyFlinkKafkaConsumer<String> consumerFunction =
-				new DummyFlinkKafkaConsumer<>(Collections.<KafkaTopicPartition>emptyList());
+				new DummyFlinkKafkaConsumer<>(
+					Collections.<KafkaTopicPartition>emptyList(),
+					FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
 
 		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction);
 
@@ -203,7 +212,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
 
 		// assert that no state was restored
-		assertTrue(consumerFunction.getRestoredState() == null);
+		assertTrue(consumerFunction.getRestoredState().isEmpty());
 
 		consumerOperator.close();
 		consumerOperator.cancel();
@@ -217,7 +226,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 	public void testRestoreFromEmptyStateWithPartitions() throws Exception {
 		final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());
 
-		final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(partitions);
+		final DummyFlinkKafkaConsumer<String> consumerFunction =
+			new DummyFlinkKafkaConsumer<>(partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
 
 		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
 				new StreamSource<>(consumerFunction);
@@ -238,19 +248,25 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 
 		testHarness.open();
 
-		// the expected state in "kafka-consumer-migration-test-flink*-empty-state-snapshot";
-		// since the state is empty, the consumer should reflect on the startup mode to determine start offsets.
+		// the expected state in "kafka-consumer-migration-test-flink1.2-snapshot-empty-state";
+		// all new partitions after the snapshot are considered as partitions that were created while the
+		// consumer wasn't running, and should start from the earliest offset.
 		final HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<>();
 		for (KafkaTopicPartition partition : PARTITION_STATE.keySet()) {
-			expectedSubscribedPartitionsWithStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+			expectedSubscribedPartitionsWithStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
 		}
 
 		// assert that there are partitions and is identical to expected list
 		assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
 		assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
-		Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, consumerFunction.getSubscribedPartitionsToStartOffsets());
+		assertEquals(expectedSubscribedPartitionsWithStartOffsets, consumerFunction.getSubscribedPartitionsToStartOffsets());
 
-		assertTrue(consumerFunction.getRestoredState() == null);
+		// the new partitions should have been considered as restored state
+		assertTrue(consumerFunction.getRestoredState() != null);
+		assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+		for (Map.Entry<KafkaTopicPartition, Long> expectedEntry : expectedSubscribedPartitionsWithStartOffsets.entrySet()) {
+			assertEquals(expectedEntry.getValue(), consumerFunction.getRestoredState().get(expectedEntry.getKey()));
+		}
 
 		consumerOperator.close();
 		consumerOperator.cancel();
@@ -264,7 +280,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 	public void testRestore() throws Exception {
 		final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());
 
-		final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(partitions);
+		final DummyFlinkKafkaConsumer<String> consumerFunction =
+			new DummyFlinkKafkaConsumer<>(partitions, FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED);
 
 		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
 				new StreamSource<>(consumerFunction);
@@ -290,16 +307,56 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
 
 		// on restore, subscribedPartitionsToStartOffsets should be identical to the restored state
-		Assert.assertEquals(PARTITION_STATE, consumerFunction.getSubscribedPartitionsToStartOffsets());
+		assertEquals(PARTITION_STATE, consumerFunction.getSubscribedPartitionsToStartOffsets());
 
 		// assert that state is correctly restored from legacy checkpoint
 		assertTrue(consumerFunction.getRestoredState() != null);
-		Assert.assertEquals(PARTITION_STATE, consumerFunction.getRestoredState());
+		assertEquals(PARTITION_STATE, consumerFunction.getRestoredState());
 
 		consumerOperator.close();
 		consumerOperator.cancel();
 	}
 
+	/**
+	 * Test restoring from savepoints before version Flink 1.3 should fail if discovery is enabled.
+	 */
+	@Test
+	public void testRestoreFailsWithNonEmptyPreFlink13StatesIfDiscoveryEnabled() throws Exception {
+		assumeTrue(testMigrateVersion == MigrationVersion.v1_1 || testMigrateVersion == MigrationVersion.v1_2);
+
+		final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());
+
+		final DummyFlinkKafkaConsumer<String> consumerFunction =
+			new DummyFlinkKafkaConsumer<>(partitions, 1000L); // discovery enabled
+
+		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
+			new StreamSource<>(consumerFunction);
+
+		final AbstractStreamOperatorTestHarness<String> testHarness =
+			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
+
+		testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		testHarness.setup();
+
+		// restore state from binary snapshot file; should fail since discovery is enabled
+		try {
+			MigrationTestUtil.restoreFromSnapshot(
+				testHarness,
+				OperatorSnapshotUtil.getResourceFilename(
+					"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"),
+				testMigrateVersion);
+
+			fail("Restore from savepoints from version before Flink 1.3.x should have failed if discovery is enabled.");
+		} catch (Exception e) {
+			if (testMigrateVersion == MigrationVersion.v1_1) {
+				Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+			} else {
+				Assert.assertTrue(e instanceof IllegalArgumentException);
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
@@ -310,16 +367,23 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		private final AbstractFetcher<T, ?> fetcher;
 
 		@SuppressWarnings("unchecked")
-		DummyFlinkKafkaConsumer(AbstractFetcher<T, ?> fetcher, List<KafkaTopicPartition> partitions) {
-			super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class));
+		DummyFlinkKafkaConsumer(
+				AbstractFetcher<T, ?> fetcher,
+				List<KafkaTopicPartition> partitions,
+				long discoveryInterval) {
+
+			super(
+				Arrays.asList("dummy-topic"),
+				null,
+				(KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class),
+				discoveryInterval);
+
 			this.fetcher = fetcher;
 			this.partitions = partitions;
 		}
 
-		DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions) {
-			super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class));
-			this.fetcher = mock(AbstractFetcher.class);
-			this.partitions = partitions;
+		DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions, long discoveryInterval) {
+			this(mock(AbstractFetcher.class), partitions, discoveryInterval);
 		}
 
 		@Override
@@ -334,8 +398,21 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		}
 
 		@Override
-		protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
-			return partitions;
+		protected AbstractPartitionDiscoverer createPartitionDiscoverer(
+				KafkaTopicsDescriptor topicsDescriptor,
+				int indexOfThisSubtask,
+				int numParallelSubtasks) {
+
+			AbstractPartitionDiscoverer mockPartitionDiscoverer = mock(AbstractPartitionDiscoverer.class);
+
+			try {
+				when(mockPartitionDiscoverer.discoverPartitions()).thenReturn(partitions);
+			} catch (Exception e) {
+				// ignore
+			}
+			when(mockPartitionDiscoverer.setAndCheckDiscoveredPartition(any(KafkaTopicPartition.class))).thenReturn(true);
+
+			return mockPartitionDiscoverer;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index d673e8e..e0508ce 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -27,15 +27,17 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.commons.collections.map.LinkedMap;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -45,7 +47,6 @@ import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -133,8 +134,15 @@ public class FlinkKafkaConsumerBaseTest {
 		listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L));
 
 		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+		StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+		when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+		when(context.getIndexOfThisSubtask()).thenReturn(0);
+		consumer.setRuntimeContext(context);
 
-		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+		// mock old 1.2 state (empty)
+		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState<Serializable>());
+		// mock 1.3 state
+		when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
 		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
 
@@ -176,7 +184,10 @@ public class FlinkKafkaConsumerBaseTest {
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		TestingListState<Serializable> listState = new TestingListState<>();
-		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+		// mock old 1.2 state (empty)
+		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState<Serializable>());
+		// mock 1.3 state
+		when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
 		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
 
@@ -192,34 +203,8 @@ public class FlinkKafkaConsumerBaseTest {
 		assertFalse(listState.get().iterator().hasNext());
 	}
 
-	/**
-	 * Tests that on snapshots, states and offsets to commit to Kafka are correct.
-	 */
-	@SuppressWarnings("unchecked")
 	@Test
-	public void checkUseFetcherWhenNoCheckpoint() throws Exception {
-
-		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
-		List<KafkaTopicPartition> partitionList = new ArrayList<>(1);
-		partitionList.add(new KafkaTopicPartition("test", 0));
-		consumer.setSubscribedPartitions(partitionList);
-
-		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
-		TestingListState<Serializable> listState = new TestingListState<>();
-		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
-		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
-		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
-
-		// make the context signal that there is no restored state, then validate that
-		when(initializationContext.isRestored()).thenReturn(false);
-		consumer.initializeState(initializationContext);
-		consumer.run(mock(SourceFunction.SourceContext.class));
-	}
-
-	@Test
-	public void testConfigureOnCheckpointsCommitMode() {
+	public void testConfigureOnCheckpointsCommitMode() throws Exception {
 
 		DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
 		consumer.setIsAutoCommitEnabled(true); // this should be ignored
@@ -235,7 +220,7 @@ public class FlinkKafkaConsumerBaseTest {
 	}
 
 	@Test
-	public void testConfigureAutoCommitMode() {
+	public void testConfigureAutoCommitMode() throws Exception {
 
 		DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
 		consumer.setIsAutoCommitEnabled(true);
@@ -251,7 +236,7 @@ public class FlinkKafkaConsumerBaseTest {
 	}
 
 	@Test
-	public void testConfigureDisableOffsetCommitWithCheckpointing() {
+	public void testConfigureDisableOffsetCommitWithCheckpointing() throws Exception {
 
 		DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
 		consumer.setIsAutoCommitEnabled(true); // this should be ignored
@@ -269,7 +254,7 @@ public class FlinkKafkaConsumerBaseTest {
 	}
 
 	@Test
-	public void testConfigureDisableOffsetCommitWithoutCheckpointing() {
+	public void testConfigureDisableOffsetCommitWithoutCheckpointing() throws Exception {
 
 		DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
 		consumer.setIsAutoCommitEnabled(false);
@@ -321,8 +306,10 @@ public class FlinkKafkaConsumerBaseTest {
 		OperatorStateStore backend = mock(OperatorStateStore.class);
 
 		TestingListState<Serializable> listState = new TestingListState<>();
-
-		when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+		// mock old 1.2 state (empty)
+		when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState<Serializable>());
+		// mock 1.3 state
+		when(backend.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
 		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
 
@@ -448,8 +435,10 @@ public class FlinkKafkaConsumerBaseTest {
 		OperatorStateStore backend = mock(OperatorStateStore.class);
 
 		TestingListState<Serializable> listState = new TestingListState<>();
-
-		when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+		// mock old 1.2 state (empty)
+		when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState<Serializable>());
+		// mock 1.3 state
+		when(backend.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
 		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
 
@@ -567,7 +556,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		@SuppressWarnings("unchecked")
 		public DummyFlinkKafkaConsumer() {
-			super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
+			super(Arrays.asList("dummy-topic"), null, (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class), 0);
 		}
 
 		@Override
@@ -583,8 +572,11 @@ public class FlinkKafkaConsumerBaseTest {
 		}
 
 		@Override
-		protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
-			return Collections.emptyList();
+		protected AbstractPartitionDiscoverer createPartitionDiscoverer(
+				KafkaTopicsDescriptor topicsDescriptor,
+				int indexOfThisSubtask,
+				int numParallelSubtasks) {
+			return mock(AbstractPartitionDiscoverer.class);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
deleted file mode 100644
index 0be1d57..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests that the partition assignment is deterministic and stable.
- */
-public class KafkaConsumerPartitionAssignmentTest {
-
-	@Test
-	public void testPartitionsEqualConsumers() {
-		try {
-			List<KafkaTopicPartition> inPartitions = Arrays.asList(
-					new KafkaTopicPartition("test-topic", 4),
-					new KafkaTopicPartition("test-topic", 52),
-					new KafkaTopicPartition("test-topic", 17),
-					new KafkaTopicPartition("test-topic", 1));
-
-			for (int i = 0; i < inPartitions.size(); i++) {
-				Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>();
-				FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
-					subscribedPartitionsToStartOffsets,
-					inPartitions,
-					i,
-					inPartitions.size(),
-					StartupMode.GROUP_OFFSETS,
-					null);
-
-				List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
-
-				assertEquals(1, subscribedPartitions.size());
-				assertTrue(contains(inPartitions, subscribedPartitions.get(0).getPartition()));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testMultiplePartitionsPerConsumers() {
-		try {
-			final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-
-			final List<KafkaTopicPartition> partitions = new ArrayList<>();
-			final Set<KafkaTopicPartition> allPartitions = new HashSet<>();
-
-			for (int p : partitionIDs) {
-				KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p);
-				partitions.add(part);
-				allPartitions.add(part);
-			}
-
-			final int numConsumers = 3;
-			final int minPartitionsPerConsumer = partitions.size() / numConsumers;
-			final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1;
-
-			for (int i = 0; i < numConsumers; i++) {
-				Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>();
-				FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
-					subscribedPartitionsToStartOffsets,
-					partitions,
-					i,
-					numConsumers,
-					StartupMode.GROUP_OFFSETS,
-					null);
-
-				List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
-
-				assertTrue(subscribedPartitions.size() >= minPartitionsPerConsumer);
-				assertTrue(subscribedPartitions.size() <= maxPartitionsPerConsumer);
-
-				for (KafkaTopicPartition p : subscribedPartitions) {
-					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p));
-				}
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPartitionsFewerThanConsumers() {
-		try {
-			List<KafkaTopicPartition> inPartitions = Arrays.asList(
-					new KafkaTopicPartition("test-topic", 4),
-					new KafkaTopicPartition("test-topic", 52),
-					new KafkaTopicPartition("test-topic", 17),
-					new KafkaTopicPartition("test-topic", 1));
-
-			final Set<KafkaTopicPartition> allPartitions = new HashSet<>();
-			allPartitions.addAll(inPartitions);
-
-			final int numConsumers = 2 * inPartitions.size() + 3;
-
-			for (int i = 0; i < numConsumers; i++) {
-				Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>();
-				FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
-					subscribedPartitionsToStartOffsets,
-					inPartitions,
-					i,
-					numConsumers,
-					StartupMode.GROUP_OFFSETS,
-					null);
-
-				List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
-
-				assertTrue(subscribedPartitions.size() <= 1);
-
-				for (KafkaTopicPartition p : subscribedPartitions) {
-					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p));
-				}
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testAssignEmptyPartitions() {
-		try {
-			List<KafkaTopicPartition> ep = new ArrayList<>();
-			Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets = new HashMap<>();
-			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
-				subscribedPartitionsToStartOffsets,
-				ep,
-				2,
-				4,
-				StartupMode.GROUP_OFFSETS,
-				null);
-			assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
-
-			subscribedPartitionsToStartOffsets = new HashMap<>();
-			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
-				subscribedPartitionsToStartOffsets,
-				ep,
-				0,
-				1,
-				StartupMode.GROUP_OFFSETS,
-				null);
-			assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testGrowingPartitionsRemainsStable() {
-		try {
-			final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-			List<KafkaTopicPartition> newPartitions = new ArrayList<>();
-
-			for (int p : newPartitionIDs) {
-				KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p);
-				newPartitions.add(part);
-			}
-
-			List<KafkaTopicPartition> initialPartitions = newPartitions.subList(0, 7);
-
-			final Set<KafkaTopicPartition> allNewPartitions = new HashSet<>(newPartitions);
-			final Set<KafkaTopicPartition> allInitialPartitions = new HashSet<>(initialPartitions);
-
-			final int numConsumers = 3;
-			final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers;
-			final int maxInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers + 1;
-			final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers;
-			final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1;
-
-			Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets1 = new HashMap<>();
-			Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets2 = new HashMap<>();
-			Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets3 = new HashMap<>();
-
-			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
-				subscribedPartitionsToStartOffsets1,
-				initialPartitions,
-				0,
-				numConsumers,
-				StartupMode.GROUP_OFFSETS,
-				null);
-
-			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
-				subscribedPartitionsToStartOffsets2,
-				initialPartitions,
-				1,
-				numConsumers,
-				StartupMode.GROUP_OFFSETS,
-				null);
-
-			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
-				subscribedPartitionsToStartOffsets3,
-				initialPartitions,
-				2,
-				numConsumers,
-				StartupMode.GROUP_OFFSETS,
-				null);
-
-			List<KafkaTopicPartition> subscribedPartitions1 = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
-			List<KafkaTopicPartition> subscribedPartitions2 = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
-			List<KafkaTopicPartition> subscribedPartitions3 = new ArrayList<>(subscribedPartitionsToStartOffsets3.keySet());
-
-			assertTrue(subscribedPartitions1.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(subscribedPartitions1.size() <= maxInitialPartitionsPerConsumer);
-			assertTrue(subscribedPartitions2.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(subscribedPartitions2.size() <= maxInitialPartitionsPerConsumer);
-			assertTrue(subscribedPartitions3.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(subscribedPartitions3.size() <= maxInitialPartitionsPerConsumer);
-
-			for (KafkaTopicPartition p : subscribedPartitions1) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p));
-			}
-
-			for (KafkaTopicPartition p : subscribedPartitions2) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p));
-			}
-
-			for (KafkaTopicPartition p : subscribedPartitions3) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p));
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allInitialPartitions.isEmpty());
-
-			// grow the set of partitions and distribute anew
-
-			subscribedPartitionsToStartOffsets1 = new HashMap<>();
-			subscribedPartitionsToStartOffsets2 = new HashMap<>();
-			subscribedPartitionsToStartOffsets3 = new HashMap<>();
-
-			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
-				subscribedPartitionsToStartOffsets1,
-				newPartitions,
-				0,
-				numConsumers,
-				StartupMode.GROUP_OFFSETS,
-				null);
-
-			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
-				subscribedPartitionsToStartOffsets2,
-				newPartitions,
-				1,
-				numConsumers,
-				StartupMode.GROUP_OFFSETS,
-				null);
-
-			FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
-				subscribedPartitionsToStartOffsets3,
-				newPartitions,
-				2,
-				numConsumers,
-				StartupMode.GROUP_OFFSETS,
-				null);
-
-			List<KafkaTopicPartition> subscribedPartitions1New = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
-			List<KafkaTopicPartition> subscribedPartitions2New = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
-			List<KafkaTopicPartition> subscribedPartitions3New = new ArrayList<>(subscribedPartitionsToStartOffsets3.keySet());
-
-			// new partitions must include all old partitions
-
-			assertTrue(subscribedPartitions1New.size() > subscribedPartitions1.size());
-			assertTrue(subscribedPartitions2New.size() > subscribedPartitions2.size());
-			assertTrue(subscribedPartitions3New.size() > subscribedPartitions3.size());
-
-			assertTrue(subscribedPartitions1New.containsAll(subscribedPartitions1));
-			assertTrue(subscribedPartitions2New.containsAll(subscribedPartitions2));
-			assertTrue(subscribedPartitions3New.containsAll(subscribedPartitions3));
-
-			assertTrue(subscribedPartitions1New.size() >= minNewPartitionsPerConsumer);
-			assertTrue(subscribedPartitions1New.size() <= maxNewPartitionsPerConsumer);
-			assertTrue(subscribedPartitions2New.size() >= minNewPartitionsPerConsumer);
-			assertTrue(subscribedPartitions2New.size() <= maxNewPartitionsPerConsumer);
-			assertTrue(subscribedPartitions3New.size() >= minNewPartitionsPerConsumer);
-			assertTrue(subscribedPartitions3New.size() <= maxNewPartitionsPerConsumer);
-
-			for (KafkaTopicPartition p : subscribedPartitions1New) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p));
-			}
-			for (KafkaTopicPartition p : subscribedPartitions2New) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p));
-			}
-			for (KafkaTopicPartition p : subscribedPartitions3New) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p));
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allNewPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	private boolean contains(List<KafkaTopicPartition> inPartitions, int partition) {
-		for (KafkaTopicPartition ktp : inPartitions) {
-			if (ktp.getPartition() == partition) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 03a23f5..27e9ccd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -187,7 +186,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 				RuntimeException re = (RuntimeException) jee.getCause();
 
-				assertTrue(re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]"));
+				assertTrue(re.getMessage().contains("Unable to retrieve any partitions"));
 			}
 		}
 	}
@@ -1625,7 +1624,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}
 		});
 
-		JobExecutionResult result = tryExecute(env1, "Consume " + elementCount + " elements from Kafka");
+		tryExecute(env1, "Consume " + elementCount + " elements from Kafka");
 
 		deleteTestTopic(topic);
 	}