You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/01 07:35:59 UTC

[08/14] flink git commit: [FLINK-4022] [kafka] Partition / topic discovery for FlinkKafkaConsumer

[FLINK-4022] [kafka] Partition / topic discovery for FlinkKafkaConsumer

This closes #3746.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/085d4db8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/085d4db8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/085d4db8

Branch: refs/heads/master
Commit: 085d4db849b9c659ba6bb2c7d66a04debeb048c4
Parents: b8c8f20
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Apr 20 17:17:43 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Jul 1 15:33:42 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |  15 +
 .../connectors/kafka/FlinkKafkaConsumer010.java |  12 +
 .../internal/Kafka010PartitionDiscoverer.java   |  38 +
 .../connectors/kafka/Kafka010FetcherTest.java   | 483 -----------
 .../kafka/internal/Kafka010FetcherTest.java     | 480 +++++++++++
 .../connectors/kafka/FlinkKafkaConsumer08.java  | 173 +---
 .../kafka/internals/ClosableBlockingQueue.java  | 506 -----------
 .../kafka/internals/Kafka08Fetcher.java         |  25 +-
 .../internals/Kafka08PartitionDiscoverer.java   | 293 +++++++
 .../kafka/internals/PartitionInfoFetcher.java   |  18 +-
 .../internals/PeriodicOffsetCommitter.java      |   9 +-
 .../connectors/kafka/KafkaConsumer08Test.java   |  51 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  16 +-
 .../internals/ClosableBlockingQueueTest.java    | 611 --------------
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  59 +-
 .../kafka/internal/Kafka09Fetcher.java          |  10 +-
 .../internal/Kafka09PartitionDiscoverer.java    | 103 +++
 .../kafka/internal/KafkaConsumerThread.java     | 229 ++++-
 .../connectors/kafka/Kafka09FetcherTest.java    | 482 -----------
 .../kafka/internal/Kafka09FetcherTest.java      | 479 +++++++++++
 .../kafka/internal/KafkaConsumerThreadTest.java | 843 +++++++++++++++++++
 .../kafka/FlinkKafkaConsumerBase.java           | 497 ++++++-----
 .../kafka/internals/AbstractFetcher.java        | 235 ++++--
 .../internals/AbstractPartitionDiscoverer.java  | 256 ++++++
 .../kafka/internals/ClosableBlockingQueue.java  | 506 +++++++++++
 .../kafka/internals/KafkaTopicPartition.java    |  19 +
 .../kafka/internals/KafkaTopicsDescriptor.java  |  74 ++
 .../FlinkKafkaConsumerBaseMigrationTest.java    | 121 ++-
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  74 +-
 .../KafkaConsumerPartitionAssignmentTest.java   | 356 --------
 .../connectors/kafka/KafkaConsumerTestBase.java |   5 +-
 .../kafka/internals/AbstractFetcherTest.java    |  18 +-
 .../AbstractPartitionDiscovererTest.java        | 428 ++++++++++
 .../internals/ClosableBlockingQueueTest.java    | 611 ++++++++++++++
 34 files changed, 5055 insertions(+), 3080 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index bc7e7de..23ad3b8 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -292,6 +292,21 @@ Flink on YARN supports automatic restart of lost YARN containers.
 
 If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
 
+### Kafka Consumers Partition Discovery
+
+The Flink Kafka Consumer supports discovering dynamically created Kafka partitions, and consumes them with
+exactly-once guarantees. All partitions discovered after the initial retrieval of partition metadata (i.e., when the
+job starts running) will be consumed from the earliest possible offset.
+
+By default, partition discovery is disabled. To enable it, set a non-negative value
+for `flink.partition-discovery.interval-millis` in the provided properties config,
+representing the discovery interval in milliseconds. 
+
+<span class="label label-danger">Limitation</span> When the consumer is restored from a savepoint from Flink versions
+prior to Flink 1.3.x, partition discovery cannot be enabled on the restore run. If enabled, the restore would fail
+with an exception. In this case, in order to use partition discovery, please first take a savepoint in Flink 1.3.x and
+then restore again from that.
+
 ### Kafka Consumers Offset Committing Behaviour Configuration
 
 The Flink Kafka Consumer allows configuring the behaviour of how offsets

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 1bbd1dc..b851e3e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -22,8 +22,11 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -159,4 +162,13 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 				pollTimeout,
 				useMetrics);
 	}
+
+	@Override
+	protected AbstractPartitionDiscoverer createPartitionDiscoverer(
+			KafkaTopicsDescriptor topicsDescriptor,
+			int indexOfThisSubtask,
+			int numParallelSubtasks) {
+
+		return new Kafka010PartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010PartitionDiscoverer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010PartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010PartitionDiscoverer.java
new file mode 100644
index 0000000..0c10f40
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010PartitionDiscoverer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
+
+import java.util.Properties;
+
+/**
+ * A partition discoverer that can be used to discover topics and partitions metadata
+ * from Kafka brokers via the Kafka 0.10 high-level consumer API.
+ */
+public class Kafka010PartitionDiscoverer extends Kafka09PartitionDiscoverer {
+
+	public Kafka010PartitionDiscoverer(
+		KafkaTopicsDescriptor topicsDescriptor,
+		int indexOfThisSubtask,
+		int numParallelSubtasks,
+		Properties kafkaProperties) {
+
+		super(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, kafkaProperties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
deleted file mode 100644
index aedd4ba..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ /dev/null
@@ -1,483 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
-import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-/**
- * Unit tests for the {@link Kafka010Fetcher}.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaConsumerThread.class)
-public class Kafka010FetcherTest {
-
-	@Test
-	public void testCommitDoesNotBlock() throws Exception {
-
-		// test data
-		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
-		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
-		testCommitData.put(testPartition, 11L);
-
-		// to synchronize when the consumer is in its blocking method
-		final OneShotLatch sync = new OneShotLatch();
-
-		// ----- the mock consumer with blocking poll calls ----
-		final MultiShotLatch blockerLatch = new MultiShotLatch();
-
-		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-
-			@Override
-			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
-				sync.trigger();
-				blockerLatch.await();
-				return ConsumerRecords.empty();
-			}
-		});
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) {
-				blockerLatch.trigger();
-				return null;
-			}
-		}).when(mockConsumer).wakeup();
-
-		// make sure the fetcher creates the mock consumer
-		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-		// ----- create the test fetcher -----
-
-		@SuppressWarnings("unchecked")
-		SourceContext<String> sourceContext = mock(SourceContext.class);
-		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
-			Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
-		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-		final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-				sourceContext,
-				partitionsWithInitialOffsets,
-				null, /* periodic assigner */
-				null, /* punctuated assigner */
-				new TestProcessingTimeService(),
-				10,
-				getClass().getClassLoader(),
-				"taskname-with-subtask",
-				new UnregisteredMetricsGroup(),
-				schema,
-				new Properties(),
-				0L,
-				false);
-
-		// ----- run the fetcher -----
-
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-		final Thread fetcherRunner = new Thread("fetcher runner") {
-
-			@Override
-			public void run() {
-				try {
-					fetcher.runFetchLoop();
-				} catch (Throwable t) {
-					error.set(t);
-				}
-			}
-		};
-		fetcherRunner.start();
-
-		// wait until the fetcher has reached the method of interest
-		sync.await();
-
-		// ----- trigger the offset commit -----
-
-		final AtomicReference<Throwable> commitError = new AtomicReference<>();
-		final Thread committer = new Thread("committer runner") {
-			@Override
-			public void run() {
-				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData);
-				} catch (Throwable t) {
-					commitError.set(t);
-				}
-			}
-		};
-		committer.start();
-
-		// ----- ensure that the committer finishes in time  -----
-		committer.join(30000);
-		assertFalse("The committer did not finish in time", committer.isAlive());
-
-		// ----- test done, wait till the fetcher is done for a clean shutdown -----
-		fetcher.cancel();
-		fetcherRunner.join();
-
-		// check that there were no errors in the fetcher
-		final Throwable fetcherError = error.get();
-		if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
-			throw new Exception("Exception in the fetcher", fetcherError);
-		}
-
-		final Throwable committerError = commitError.get();
-		if (committerError != null) {
-			throw new Exception("Exception in the committer", committerError);
-		}
-	}
-
-	@Test
-	public void ensureOffsetsGetCommitted() throws Exception {
-
-		// test data
-		final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
-		final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
-
-		final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
-		testCommitData1.put(testPartition1, 11L);
-		testCommitData1.put(testPartition2, 18L);
-
-		final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
-		testCommitData2.put(testPartition1, 19L);
-		testCommitData2.put(testPartition2, 28L);
-
-		final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
-
-		// ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
-
-		final MultiShotLatch blockerLatch = new MultiShotLatch();
-
-		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-
-		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-			@Override
-			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
-				blockerLatch.await();
-				return ConsumerRecords.empty();
-			}
-		});
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) {
-				blockerLatch.trigger();
-				return null;
-			}
-		}).when(mockConsumer).wakeup();
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) {
-				@SuppressWarnings("unchecked")
-				Map<TopicPartition, OffsetAndMetadata> offsets =
-						(Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
-
-				OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
-
-				commitStore.add(offsets);
-				callback.onComplete(offsets, null);
-
-				return null;
-			}
-		}).when(mockConsumer).commitAsync(
-				Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
-
-		// make sure the fetcher creates the mock consumer
-		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-		// ----- create the test fetcher -----
-
-		@SuppressWarnings("unchecked")
-		SourceContext<String> sourceContext = mock(SourceContext.class);
-		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
-			Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
-		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-		final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-				sourceContext,
-				partitionsWithInitialOffsets,
-				null, /* periodic assigner */
-				null, /* punctuated assigner */
-				new TestProcessingTimeService(),
-				10,
-				getClass().getClassLoader(),
-				"taskname-with-subtask",
-				new UnregisteredMetricsGroup(),
-				schema,
-				new Properties(),
-				0L,
-				false);
-
-		// ----- run the fetcher -----
-
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-		final Thread fetcherRunner = new Thread("fetcher runner") {
-
-			@Override
-			public void run() {
-				try {
-					fetcher.runFetchLoop();
-				} catch (Throwable t) {
-					error.set(t);
-				}
-			}
-		};
-		fetcherRunner.start();
-
-		// ----- trigger the first offset commit -----
-
-		fetcher.commitInternalOffsetsToKafka(testCommitData1);
-		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
-
-		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
-			TopicPartition partition = entry.getKey();
-			if (partition.topic().equals("test")) {
-				assertEquals(42, partition.partition());
-				assertEquals(12L, entry.getValue().offset());
-			}
-			else if (partition.topic().equals("another")) {
-				assertEquals(99, partition.partition());
-				assertEquals(18L, entry.getValue().offset());
-			}
-		}
-
-		// ----- trigger the second offset commit -----
-
-		fetcher.commitInternalOffsetsToKafka(testCommitData2);
-		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
-
-		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
-			TopicPartition partition = entry.getKey();
-			if (partition.topic().equals("test")) {
-				assertEquals(42, partition.partition());
-				assertEquals(20L, entry.getValue().offset());
-			}
-			else if (partition.topic().equals("another")) {
-				assertEquals(99, partition.partition());
-				assertEquals(28L, entry.getValue().offset());
-			}
-		}
-
-		// ----- test done, wait till the fetcher is done for a clean shutdown -----
-		fetcher.cancel();
-		fetcherRunner.join();
-
-		// check that there were no errors in the fetcher
-		final Throwable caughtError = error.get();
-		if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
-			throw new Exception("Exception in the fetcher", caughtError);
-		}
-	}
-
-	@Test
-	public void testCancellationWhenEmitBlocks() throws Exception {
-
-		// ----- some test data -----
-
-		final String topic = "test-topic";
-		final int partition = 3;
-		final byte[] payload = new byte[] {1, 2, 3, 4};
-
-		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
-
-		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
-		data.put(new TopicPartition(topic, partition), records);
-
-		final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
-
-		// ----- the test consumer -----
-
-		final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-			@Override
-			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
-				return consumerRecords;
-			}
-		});
-
-		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-		// ----- build a fetcher -----
-
-		BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
-		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
-			Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
-		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-		final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-				sourceContext,
-				partitionsWithInitialOffsets,
-				null, /* periodic watermark extractor */
-				null, /* punctuated watermark extractor */
-				new TestProcessingTimeService(),
-				10, /* watermark interval */
-				this.getClass().getClassLoader(),
-				"task_name",
-				new UnregisteredMetricsGroup(),
-				schema,
-				new Properties(),
-				0L,
-				false);
-
-		// ----- run the fetcher -----
-
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-		final Thread fetcherRunner = new Thread("fetcher runner") {
-
-			@Override
-			public void run() {
-				try {
-					fetcher.runFetchLoop();
-				} catch (Throwable t) {
-					error.set(t);
-				}
-			}
-		};
-		fetcherRunner.start();
-
-		// wait until the thread started to emit records to the source context
-		sourceContext.waitTillHasBlocker();
-
-		// now we try to cancel the fetcher, including the interruption usually done on the task thread
-		// once it has finished, there must be no more thread blocked on the source context
-		fetcher.cancel();
-		fetcherRunner.interrupt();
-		fetcherRunner.join();
-
-		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
-	}
-
-	// ------------------------------------------------------------------------
-	//  test utilities
-	// ------------------------------------------------------------------------
-
-	private static final class BlockingSourceContext<T> implements SourceContext<T> {
-
-		private final ReentrantLock lock = new ReentrantLock();
-		private final OneShotLatch inBlocking = new OneShotLatch();
-
-		@Override
-		public void collect(T element) {
-			block();
-		}
-
-		@Override
-		public void collectWithTimestamp(T element, long timestamp) {
-			block();
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			block();
-		}
-
-		@Override
-		public void markAsTemporarilyIdle() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Object getCheckpointLock() {
-			return new Object();
-		}
-
-		@Override
-		public void close() {}
-
-		public void waitTillHasBlocker() throws InterruptedException {
-			inBlocking.await();
-		}
-
-		public boolean isStillBlocking() {
-			return lock.isLocked();
-		}
-
-		@SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
-		private void block() {
-			lock.lock();
-			try {
-				inBlocking.trigger();
-
-				// put this thread to sleep indefinitely
-				final Object o = new Object();
-				while (true) {
-					synchronized (o) {
-						o.wait();
-					}
-				}
-			}
-			catch (InterruptedException e) {
-				// exit cleanly, simply reset the interruption flag
-				Thread.currentThread().interrupt();
-			}
-			finally {
-				lock.unlock();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
new file mode 100644
index 0000000..2ea1622
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka010Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Kafka010FetcherTest {
+
+	@Test
+	public void testCommitDoesNotBlock() throws Exception {
+
+		// test data
+		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+		testCommitData.put(testPartition, 11L);
+
+		// to synchronize when the consumer is in its blocking method
+		final OneShotLatch sync = new OneShotLatch();
+
+		// ----- the mock consumer with blocking poll calls ----
+		final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+				sync.trigger();
+				blockerLatch.await();
+				return ConsumerRecords.empty();
+			}
+		});
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				blockerLatch.trigger();
+				return null;
+			}
+		}).when(mockConsumer).wakeup();
+
+		// make sure the fetcher creates the mock consumer
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- create the test fetcher -----
+
+		@SuppressWarnings("unchecked")
+		SourceContext<String> sourceContext = mock(SourceContext.class);
+		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+			Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+		final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+				sourceContext,
+				partitionsWithInitialOffsets,
+				null, /* periodic assigner */
+				null, /* punctuated assigner */
+				new TestProcessingTimeService(),
+				10,
+				getClass().getClassLoader(),
+				"taskname-with-subtask",
+				new UnregisteredMetricsGroup(),
+				schema,
+				new Properties(),
+				0L,
+				false);
+
+		// ----- run the fetcher -----
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
+
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
+
+		// wait until the fetcher has reached the method of interest
+		sync.await();
+
+		// ----- trigger the offset commit -----
+
+		final AtomicReference<Throwable> commitError = new AtomicReference<>();
+		final Thread committer = new Thread("committer runner") {
+			@Override
+			public void run() {
+				try {
+					fetcher.commitInternalOffsetsToKafka(testCommitData);
+				} catch (Throwable t) {
+					commitError.set(t);
+				}
+			}
+		};
+		committer.start();
+
+		// ----- ensure that the committer finishes in time  -----
+		committer.join(30000);
+		assertFalse("The committer did not finish in time", committer.isAlive());
+
+		// ----- test done, wait till the fetcher is done for a clean shutdown -----
+		fetcher.cancel();
+		fetcherRunner.join();
+
+		// check that there were no errors in the fetcher
+		final Throwable fetcherError = error.get();
+		if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
+			throw new Exception("Exception in the fetcher", fetcherError);
+		}
+
+		final Throwable committerError = commitError.get();
+		if (committerError != null) {
+			throw new Exception("Exception in the committer", committerError);
+		}
+	}
+
+	@Test
+	public void ensureOffsetsGetCommitted() throws Exception {
+
+		// test data
+		final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
+		final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
+
+		final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
+		testCommitData1.put(testPartition1, 11L);
+		testCommitData1.put(testPartition2, 18L);
+
+		final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
+		testCommitData2.put(testPartition1, 19L);
+		testCommitData2.put(testPartition2, 28L);
+
+		final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
+
+		// ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
+
+		final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+				blockerLatch.await();
+				return ConsumerRecords.empty();
+			}
+		});
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				blockerLatch.trigger();
+				return null;
+			}
+		}).when(mockConsumer).wakeup();
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				@SuppressWarnings("unchecked")
+				Map<TopicPartition, OffsetAndMetadata> offsets =
+						(Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
+
+				OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
+
+				commitStore.add(offsets);
+				callback.onComplete(offsets, null);
+
+				return null;
+			}
+		}).when(mockConsumer).commitAsync(
+				Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+		// make sure the fetcher creates the mock consumer
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- create the test fetcher -----
+
+		@SuppressWarnings("unchecked")
+		SourceContext<String> sourceContext = mock(SourceContext.class);
+		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+			Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+		final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+				sourceContext,
+				partitionsWithInitialOffsets,
+				null, /* periodic assigner */
+				null, /* punctuated assigner */
+				new TestProcessingTimeService(),
+				10,
+				getClass().getClassLoader(),
+				"taskname-with-subtask",
+				new UnregisteredMetricsGroup(),
+				schema,
+				new Properties(),
+				0L,
+				false);
+
+		// ----- run the fetcher -----
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
+
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
+
+		// ----- trigger the first offset commit -----
+
+		fetcher.commitInternalOffsetsToKafka(testCommitData1);
+		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
+
+		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
+			TopicPartition partition = entry.getKey();
+			if (partition.topic().equals("test")) {
+				assertEquals(42, partition.partition());
+				assertEquals(12L, entry.getValue().offset());
+			}
+			else if (partition.topic().equals("another")) {
+				assertEquals(99, partition.partition());
+				assertEquals(18L, entry.getValue().offset());
+			}
+		}
+
+		// ----- trigger the second offset commit -----
+
+		fetcher.commitInternalOffsetsToKafka(testCommitData2);
+		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
+
+		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
+			TopicPartition partition = entry.getKey();
+			if (partition.topic().equals("test")) {
+				assertEquals(42, partition.partition());
+				assertEquals(20L, entry.getValue().offset());
+			}
+			else if (partition.topic().equals("another")) {
+				assertEquals(99, partition.partition());
+				assertEquals(28L, entry.getValue().offset());
+			}
+		}
+
+		// ----- test done, wait till the fetcher is done for a clean shutdown -----
+		fetcher.cancel();
+		fetcherRunner.join();
+
+		// check that there were no errors in the fetcher
+		final Throwable caughtError = error.get();
+		if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
+			throw new Exception("Exception in the fetcher", caughtError);
+		}
+	}
+
+	@Test
+	public void testCancellationWhenEmitBlocks() throws Exception {
+
+		// ----- some test data -----
+
+		final String topic = "test-topic";
+		final int partition = 3;
+		final byte[] payload = new byte[] {1, 2, 3, 4};
+
+		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+
+		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
+		data.put(new TopicPartition(topic, partition), records);
+
+		final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
+
+		// ----- the test consumer -----
+
+		final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+				return consumerRecords;
+			}
+		});
+
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- build a fetcher -----
+
+		BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
+		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+			Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+		final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+				sourceContext,
+				partitionsWithInitialOffsets,
+				null, /* periodic watermark extractor */
+				null, /* punctuated watermark extractor */
+				new TestProcessingTimeService(),
+				10, /* watermark interval */
+				this.getClass().getClassLoader(),
+				"task_name",
+				new UnregisteredMetricsGroup(),
+				schema,
+				new Properties(),
+				0L,
+				false);
+
+		// ----- run the fetcher -----
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
+
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
+
+		// wait until the thread started to emit records to the source context
+		sourceContext.waitTillHasBlocker();
+
+		// now we try to cancel the fetcher, including the interruption usually done on the task thread
+		// once it has finished, there must be no more thread blocked on the source context
+		fetcher.cancel();
+		fetcherRunner.interrupt();
+		fetcherRunner.join();
+
+		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
+	}
+
+	// ------------------------------------------------------------------------
+	//  test utilities
+	// ------------------------------------------------------------------------
+
+	private static final class BlockingSourceContext<T> implements SourceContext<T> {
+
+		private final ReentrantLock lock = new ReentrantLock();
+		private final OneShotLatch inBlocking = new OneShotLatch();
+
+		@Override
+		public void collect(T element) {
+			block();
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			block();
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			block();
+		}
+
+		@Override
+		public void markAsTemporarilyIdle() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return new Object();
+		}
+
+		@Override
+		public void close() {}
+
+		public void waitTillHasBlocker() throws InterruptedException {
+			inBlocking.await();
+		}
+
+		public boolean isStillBlocking() {
+			return lock.isLocked();
+		}
+
+		@SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
+		private void block() {
+			lock.lock();
+			try {
+				inBlocking.trigger();
+
+				// put this thread to sleep indefinitely
+				final Object o = new Object();
+				while (true) {
+					synchronized (o) {
+						o.wait();
+					}
+				}
+			}
+			catch (InterruptedException e) {
+				// exit cleanly, simply reset the interruption flag
+				Thread.currentThread().interrupt();
+			}
+			finally {
+				lock.unlock();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 6c7b94d..c4cd3e7 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -22,38 +22,26 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.Kafka08PartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.PropertiesUtil;
 import org.apache.flink.util.SerializedValue;
 
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.Node;
 
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.PropertiesUtil.getInt;
+import static org.apache.flink.util.PropertiesUtil.getLong;
 
 /**
  * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
@@ -172,9 +160,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	 *           The properties that are used to configure both the fetcher and the offset handler.
 	 */
 	public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
-		super(topics, deserializer);
+		super(
+				topics,
+				null,
+				deserializer,
+				getLong(props, KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
 
-		checkNotNull(topics, "topics");
 		this.kafkaProperties = checkNotNull(props, "props");
 
 		// validate the zookeeper properties
@@ -212,22 +203,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	}
 
 	@Override
-	protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
-		// Connect to a broker to get the partitions for all topics
-		List<KafkaTopicPartition> partitionInfos =
-			KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, kafkaProperties));
-
-		if (partitionInfos.size() == 0) {
-			throw new RuntimeException(
-				"Unable to retrieve any partitions for the requested topics " + topics +
-					". Please check previous log entries");
-		}
-
-		if (LOG.isInfoEnabled()) {
-			logPartitionInfo(LOG, partitionInfos);
-		}
+	protected AbstractPartitionDiscoverer createPartitionDiscoverer(
+			KafkaTopicsDescriptor topicsDescriptor,
+			int indexOfThisSubtask,
+			int numParallelSubtasks) {
 
-		return partitionInfos;
+		return new Kafka08PartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, kafkaProperties);
 	}
 
 	@Override
@@ -237,104 +218,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Kafka / ZooKeeper communication utilities
+	//  Kafka / ZooKeeper configuration utilities
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Send request to Kafka to get partitions for topic.
-	 *
-	 * @param topics The name of the topics.
-	 * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic.
-	 */
-	public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) {
-		String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
-		final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
-
-		checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
-		String[] seedBrokers = seedBrokersConfString.split(",");
-		List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
-
-		final String clientId = "flink-kafka-consumer-partition-lookup";
-		final int soTimeout = getInt(properties, "socket.timeout.ms", 30000);
-		final int bufferSize = getInt(properties, "socket.receive.buffer.bytes", 65536);
-
-		Random rnd = new Random();
-		retryLoop: for (int retry = 0; retry < numRetries; retry++) {
-			// we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
-			// parallel source instances start. Still, we try all available brokers.
-			int index = rnd.nextInt(seedBrokers.length);
-			brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) {
-				String seedBroker = seedBrokers[index];
-				LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries);
-				if (++index == seedBrokers.length) {
-					index = 0;
-				}
-
-				URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker);
-				SimpleConsumer consumer = null;
-				try {
-					consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId);
-
-					TopicMetadataRequest req = new TopicMetadataRequest(topics);
-					kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
-
-					List<TopicMetadata> metaData = resp.topicsMetadata();
-
-					// clear in case we have an incomplete list from previous tries
-					partitions.clear();
-					for (TopicMetadata item : metaData) {
-						if (item.errorCode() != ErrorMapping.NoError()) {
-							// warn and try more brokers
-							LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " +
-									"for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage());
-							continue brokersLoop;
-						}
-						if (!topics.contains(item.topic())) {
-							LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
-							continue brokersLoop;
-						}
-						for (PartitionMetadata part : item.partitionsMetadata()) {
-							Node leader = brokerToNode(part.leader());
-							KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId());
-							KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
-							partitions.add(pInfo);
-						}
-					}
-					break retryLoop; // leave the loop through the brokers
-				}
-				catch (Exception e) {
-					//validates seed brokers in case of a ClosedChannelException
-					validateSeedBrokers(seedBrokers, e);
-					LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}",
-							seedBroker, topics, e.getClass().getName(), e.getMessage());
-					LOG.debug("Detailed trace", e);
-					// we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
-					try {
-						Thread.sleep(500);
-					} catch (InterruptedException e1) {
-						// sleep shorter.
-					}
-				} finally {
-					if (consumer != null) {
-						consumer.close();
-					}
-				}
-			} // brokers loop
-		} // retries loop
-		return partitions;
-	}
-
-	/**
-	 * Turn a broker instance into a node instance.
-	 * @param broker broker instance
-	 * @return Node representing the given broker
-	 */
-	private static Node brokerToNode(Broker broker) {
-		return new Node(broker.id(), broker.host(), broker.port());
-	}
-
-	/**
 	 * Validate the ZK configuration, checking for required parameters.
+	 *
 	 * @param props Properties to check
 	 */
 	protected static void validateZooKeeperConfig(Properties props) {
@@ -364,36 +253,6 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	}
 
 	/**
-	 * Validate that at least one seed broker is valid in case of a
-	 * ClosedChannelException.
-	 *
-	 * @param seedBrokers
-	 *            array containing the seed brokers e.g. ["host1:port1",
-	 *            "host2:port2"]
-	 * @param exception
-	 *            instance
-	 */
-	private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
-		if (!(exception instanceof ClosedChannelException)) {
-			return;
-		}
-		int unknownHosts = 0;
-		for (String broker : seedBrokers) {
-			URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim());
-			try {
-				InetAddress.getByName(brokerUrl.getHost());
-			} catch (UnknownHostException e) {
-				unknownHosts++;
-			}
-		}
-		// throw meaningful exception if all the provided hosts are invalid
-		if (unknownHosts == seedBrokers.length) {
-			throw new IllegalArgumentException("All the servers provided in: '"
-					+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)");
-		}
-	}
-
-	/**
 	 * Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting
 	 * the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception
 	 * right after a task is started.

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
deleted file mode 100644
index da61dd0..0000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A special form of blocking queue with two additions:
- * <ol>
- *     <li>The queue can be closed atomically when empty. Adding elements after the queue
- *         is closed fails. This allows queue consumers to atomically discover that no elements
- *         are available and mark themselves as shut down.</li>
- *     <li>The queue allows to poll batches of elements in one polling call.</li>
- * </ol>
- *
- * <p>The queue has no capacity restriction and is safe for multiple producers and consumers.
- *
- * <p>Note: Null elements are prohibited.
- *
- * @param <E> The type of elements in the queue.
- */
-public class ClosableBlockingQueue<E> {
-
-	/** The lock used to make queue accesses and open checks atomic. */
-	private final ReentrantLock lock;
-
-	/** The condition on which blocking get-calls wait if the queue is empty. */
-	private final Condition nonEmpty;
-
-	/** The deque of elements. */
-	private final ArrayDeque<E> elements;
-
-	/** Flag marking the status of the queue. */
-	private volatile boolean open;
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new empty queue.
-	 */
-	public ClosableBlockingQueue() {
-		this(10);
-	}
-
-	/**
-	 * Creates a new empty queue, reserving space for at least the specified number
-	 * of elements. The queu can still grow, of more elements are added than the
-	 * reserved space.
-	 *
-	 * @param initialSize The number of elements to reserve space for.
-	 */
-	public ClosableBlockingQueue(int initialSize) {
-		this.lock = new ReentrantLock(true);
-		this.nonEmpty = this.lock.newCondition();
-
-		this.elements = new ArrayDeque<>(initialSize);
-		this.open = true;
-
-	}
-
-	/**
-	 * Creates a new queue that contains the given elements.
-	 *
-	 * @param initialElements The elements to initially add to the queue.
-	 */
-	public ClosableBlockingQueue(Collection<? extends E> initialElements) {
-		this(initialElements.size());
-		this.elements.addAll(initialElements);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Size and status
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the number of elements currently in the queue.
-	 * @return The number of elements currently in the queue.
-	 */
-	public int size() {
-		lock.lock();
-		try {
-			return elements.size();
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	/**
-	 * Checks whether the queue is empty (has no elements).
-	 * @return True, if the queue is empty; false, if it is non-empty.
-	 */
-	public boolean isEmpty() {
-		return size() == 0;
-	}
-
-	/**
-	 * Checks whether the queue is currently open, meaning elements can be added and polled.
-	 * @return True, if the queue is open; false, if it is closed.
-	 */
-	public boolean isOpen() {
-		return open;
-	}
-
-	/**
-	 * Tries to close the queue. Closing the queue only succeeds when no elements are
-	 * in the queue when this method is called. Checking whether the queue is empty, and
-	 * marking the queue as closed is one atomic operation.
-	 *
-	 * @return True, if the queue is closed, false if the queue remains open.
-	 */
-	public boolean close() {
-		lock.lock();
-		try {
-			if (open) {
-				if (elements.isEmpty()) {
-					open = false;
-					nonEmpty.signalAll();
-					return true;
-				} else {
-					return false;
-				}
-			}
-			else {
-				// already closed
-				return true;
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Adding / Removing elements
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Tries to add an element to the queue, if the queue is still open. Checking whether the queue
-	 * is open and adding the element is one atomic operation.
-	 *
-	 * <p>Unlike the {@link #add(Object)} method, this method never throws an exception,
-	 * but only indicates via the return code if the element was added or the
-	 * queue was closed.
-	 *
-	 * @param element The element to add.
-	 * @return True, if the element was added, false if the queue was closes.
-	 */
-	public boolean addIfOpen(E element) {
-		requireNonNull(element);
-
-		lock.lock();
-		try {
-			if (open) {
-				elements.addLast(element);
-				if (elements.size() == 1) {
-					nonEmpty.signalAll();
-				}
-			}
-			return open;
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	/**
-	 * Adds the element to the queue, or fails with an exception, if the queue is closed.
-	 * Checking whether the queue is open and adding the element is one atomic operation.
-	 *
-	 * @param element The element to add.
-	 * @throws IllegalStateException Thrown, if the queue is closed.
-	 */
-	public void add(E element) throws IllegalStateException {
-		requireNonNull(element);
-
-		lock.lock();
-		try {
-			if (open) {
-				elements.addLast(element);
-				if (elements.size() == 1) {
-					nonEmpty.signalAll();
-				}
-			} else {
-				throw new IllegalStateException("queue is closed");
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	/**
-	 * Returns the queue's next element without removing it, if the queue is non-empty.
-	 * Otherwise, returns null.
-	 *
-	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
-	 * Checking whether the queue is open and getting the next element is one atomic operation.
-	 *
-	 * <p>This method never blocks.
-	 *
-	 * @return The queue's next element, or null, if the queue is empty.
-	 * @throws IllegalStateException Thrown, if the queue is closed.
-	 */
-	public E peek() {
-		lock.lock();
-		try {
-			if (open) {
-				if (elements.size() > 0) {
-					return elements.getFirst();
-				} else {
-					return null;
-				}
-			} else {
-				throw new IllegalStateException("queue is closed");
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	/**
-	 * Returns the queue's next element and removes it, the queue is non-empty.
-	 * Otherwise, this method returns null.
-	 *
-	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
-	 * Checking whether the queue is open and removing the next element is one atomic operation.
-	 *
-	 * <p>This method never blocks.
-	 *
-	 * @return The queue's next element, or null, if the queue is empty.
-	 * @throws IllegalStateException Thrown, if the queue is closed.
-	 */
-	public E poll() {
-		lock.lock();
-		try {
-			if (open) {
-				if (elements.size() > 0) {
-					return elements.removeFirst();
-				} else {
-					return null;
-				}
-			} else {
-				throw new IllegalStateException("queue is closed");
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	/**
-	 * Returns all of the queue's current elements in a list, if the queue is non-empty.
-	 * Otherwise, this method returns null.
-	 *
-	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
-	 * Checking whether the queue is open and removing the elements is one atomic operation.
-	 *
-	 * <p>This method never blocks.
-	 *
-	 * @return All of the queue's elements, or null, if the queue is empty.
-	 * @throws IllegalStateException Thrown, if the queue is closed.
-	 */
-	public List<E> pollBatch() {
-		lock.lock();
-		try {
-			if (open) {
-				if (elements.size() > 0) {
-					ArrayList<E> result = new ArrayList<>(elements);
-					elements.clear();
-					return result;
-				} else {
-					return null;
-				}
-			} else {
-				throw new IllegalStateException("queue is closed");
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	/**
-	 * Returns the next element in the queue. If the queue is empty, this method
-	 * waits until at least one element is added.
-	 *
-	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
-	 * Checking whether the queue is open and removing the next element is one atomic operation.
-	 *
-	 * @return The next element in the queue, never null.
-	 *
-	 * @throws IllegalStateException Thrown, if the queue is closed.
-	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
-	 *                              element to be added.
-	 */
-	public E getElementBlocking() throws InterruptedException {
-		lock.lock();
-		try {
-			while (open && elements.isEmpty()) {
-				nonEmpty.await();
-			}
-
-			if (open) {
-				return elements.removeFirst();
-			} else {
-				throw new IllegalStateException("queue is closed");
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	/**
-	 * Returns the next element in the queue. If the queue is empty, this method
-	 * waits at most a certain time until an element becomes available. If no element
-	 * is available after that time, the method returns null.
-	 *
-	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
-	 * Checking whether the queue is open and removing the next element is one atomic operation.
-	 *
-	 * @param timeoutMillis The number of milliseconds to block, at most.
-	 * @return The next element in the queue, or null, if the timeout expires  before an element is available.
-	 *
-	 * @throws IllegalStateException Thrown, if the queue is closed.
-	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
-	 *                              element to be added.
-	 */
-	public E getElementBlocking(long timeoutMillis) throws InterruptedException {
-		if (timeoutMillis == 0L) {
-			// wait forever case
-			return getElementBlocking();
-		} else if (timeoutMillis < 0L) {
-			throw new IllegalArgumentException("invalid timeout");
-		}
-
-		final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
-
-		lock.lock();
-		try {
-			while (open && elements.isEmpty() && timeoutMillis > 0) {
-				nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
-				timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
-			}
-
-			if (!open) {
-				throw new IllegalStateException("queue is closed");
-			}
-			else if (elements.isEmpty()) {
-				return null;
-			} else {
-				return elements.removeFirst();
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	/**
-	 * Gets all the elements found in the list, or blocks until at least one element
-	 * was added. If the queue is empty when this method is called, it blocks until
-	 * at least one element is added.
-	 *
-	 * <p>This method always returns a list with at least one element.
-	 *
-	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
-	 * Checking whether the queue is open and removing the next element is one atomic operation.
-	 *
-	 * @return A list with all elements in the queue, always at least one element.
-	 *
-	 * @throws IllegalStateException Thrown, if the queue is closed.
-	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
-	 *                              element to be added.
-	 */
-	public List<E> getBatchBlocking() throws InterruptedException {
-		lock.lock();
-		try {
-			while (open && elements.isEmpty()) {
-				nonEmpty.await();
-			}
-			if (open) {
-				ArrayList<E> result = new ArrayList<>(elements);
-				elements.clear();
-				return result;
-			} else {
-				throw new IllegalStateException("queue is closed");
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	/**
-	 * Gets all the elements found in the list, or blocks until at least one element
-	 * was added. This method is similar as {@link #getBatchBlocking()}, but takes
-	 * a number of milliseconds that the method will maximally wait before returning.
-	 *
-	 * <p>This method never returns null, but an empty list, if the queue is empty when
-	 * the method is called and the request times out before an element was added.
-	 *
-	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
-	 * Checking whether the queue is open and removing the next element is one atomic operation.
-	 *
-	 * @param timeoutMillis The number of milliseconds to wait, at most.
-	 * @return A list with all elements in the queue, possible an empty list.
-	 *
-	 * @throws IllegalStateException Thrown, if the queue is closed.
-	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
-	 *                              element to be added.
-	 */
-	public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException {
-		if (timeoutMillis == 0L) {
-			// wait forever case
-			return getBatchBlocking();
-		} else if (timeoutMillis < 0L) {
-			throw new IllegalArgumentException("invalid timeout");
-		}
-
-		final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
-
-		lock.lock();
-		try {
-			while (open && elements.isEmpty() && timeoutMillis > 0) {
-				nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
-				timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
-			}
-
-			if (!open) {
-				throw new IllegalStateException("queue is closed");
-			}
-			else if (elements.isEmpty()) {
-				return Collections.emptyList();
-			}
-			else {
-				ArrayList<E> result = new ArrayList<>(elements);
-				elements.clear();
-				return result;
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Standard Utilities
-	// ------------------------------------------------------------------------
-
-	@Override
-	public int hashCode() {
-		int hashCode = 17;
-		for (E element : elements) {
-			hashCode = 31 * hashCode + element.hashCode();
-		}
-		return hashCode;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj == this) {
-			return true;
-		} else if (obj != null && obj.getClass() == ClosableBlockingQueue.class) {
-			@SuppressWarnings("unchecked")
-			ClosableBlockingQueue<E> that = (ClosableBlockingQueue<E>) obj;
-
-			if (this.elements.size() == that.elements.size()) {
-				Iterator<E> thisElements = this.elements.iterator();
-				for (E thatNext : that.elements) {
-					E thisNext = thisElements.next();
-					if (!(thisNext == null ? thatNext == null : thisNext.equals(thatNext))) {
-						return false;
-					}
-				}
-				return true;
-			} else {
-				return false;
-			}
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return elements.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 659bbd7..aa7649c 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -72,9 +72,6 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	/** The subtask's runtime context. */
 	private final RuntimeContext runtimeContext;
 
-	/** The queue of partitions that are currently not assigned to a broker connection. */
-	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue;
-
 	/** The behavior to use in case that an offset is not valid (any more) for a partition. */
 	private final long invalidOffsetBehavior;
 
@@ -89,7 +86,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 
 	public Kafka08Fetcher(
 			SourceContext<T> sourceContext,
-			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+			Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext,
@@ -99,7 +96,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 			boolean useMetrics) throws Exception {
 		super(
 				sourceContext,
-				assignedPartitionsWithInitialOffsets,
+				seedPartitionsWithInitialOffsets,
 				watermarksPeriodic,
 				watermarksPunctuated,
 				runtimeContext.getProcessingTimeService(),
@@ -112,12 +109,6 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 		this.runtimeContext = runtimeContext;
 		this.invalidOffsetBehavior = getInvalidOffsetBehavior(kafkaProperties);
 		this.autoCommitInterval = autoCommitInterval;
-		this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
-
-		// initially, all these partitions are not assigned to a specific broker connection
-		for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
-			unassignedPartitionsQueue.add(partition);
-		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -172,8 +163,11 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 			if (autoCommitInterval > 0) {
 				LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval);
 
-				periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler,
-						subscribedPartitionStates(), errorHandler, autoCommitInterval);
+				periodicCommitter = new PeriodicOffsetCommitter(
+						zookeeperOffsetHandler,
+						subscribedPartitionStates(),
+						errorHandler,
+						autoCommitInterval);
 				periodicCommitter.setName("Periodic Kafka partition offset committer");
 				periodicCommitter.setDaemon(true);
 				periodicCommitter.start();
@@ -343,7 +337,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
+	protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
 		return new TopicAndPartition(partition.getTopic(), partition.getPartition());
 	}
 
@@ -369,8 +363,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 		}
 
 		// Set committed offsets in topic partition state
-		KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitionStates();
-		for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
+		for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
 			Long offset = offsets.get(partition.getKafkaTopicPartition());
 			if (offset != null) {
 				partition.setCommittedOffset(offset);

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08PartitionDiscoverer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08PartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08PartitionDiscoverer.java
new file mode 100644
index 0000000..9730114
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08PartitionDiscoverer.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.util.NetUtils;
+
+import kafka.cluster.Broker;
+import kafka.common.ErrorMapping;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.DEFAULT_GET_PARTITIONS_RETRIES;
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.GET_PARTITIONS_RETRIES_KEY;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.PropertiesUtil.getInt;
+
+/**
+ * A partition discoverer that can be used to discover topics and partitions metadata
+ * from Kafka brokers via the Kafka 0.8 low-level consumer API.
+ */
+public class Kafka08PartitionDiscoverer extends AbstractPartitionDiscoverer {
+
+	private static final Logger LOG = LoggerFactory.getLogger(Kafka08PartitionDiscoverer.class);
+
+	private static final String dummyClientId = "flink-kafka-consumer-partition-lookup";
+
+	/** All seed broker addresses. */
+	private final String[] seedBrokerAddresses;
+
+	/** Configuration for the Kafka client. */
+	private final int numRetries;
+	private final int soTimeout;
+	private final int bufferSize;
+
+	/**
+	 * The current seed broker address to use.
+	 * Each subtask starts with an assigned seed broker using round-robin assigning.
+	 * If this subtask fails in any one of the fetch attempts, the next address in the seed brokers list
+	 * will be used.
+	 */
+	private int currentContactSeedBrokerIndex;
+
+	/** Low-level consumer used to fetch topics and partitions metadata. */
+	private SimpleConsumer consumer;
+
+	public Kafka08PartitionDiscoverer(
+			KafkaTopicsDescriptor topicsDescriptor,
+			int indexOfThisSubtask,
+			int numParallelSubtasks,
+			Properties kafkaProperties) {
+
+		super(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks);
+
+		checkNotNull(kafkaProperties);
+
+		String seedBrokersConfString = kafkaProperties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+		checkArgument(seedBrokersConfString != null && !seedBrokersConfString.isEmpty(),
+				"Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+		this.seedBrokerAddresses = seedBrokersConfString.split(",");
+
+		// evenly distribute seed brokers across subtasks, to
+		// avoid too much pressure on a single broker on startup
+		this.currentContactSeedBrokerIndex = indexOfThisSubtask % seedBrokerAddresses.length;
+
+		this.numRetries = getInt(kafkaProperties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
+		this.soTimeout = getInt(kafkaProperties, "socket.timeout.ms", 30000);
+		this.bufferSize = getInt(kafkaProperties, "socket.receive.buffer.bytes", 65536);
+	}
+
+	@Override
+	protected void initializeConnections() {
+		URL contactUrl = NetUtils.getCorrectHostnamePort(seedBrokerAddresses[currentContactSeedBrokerIndex]);
+		this.consumer = new SimpleConsumer(contactUrl.getHost(), contactUrl.getPort(), soTimeout, bufferSize, dummyClientId);
+	}
+
+	@Override
+	protected List<String> getAllTopics() {
+		List<String> topics = new LinkedList<>();
+
+		retryLoop: for (int retry = 0; retry < numRetries; retry++) {
+			brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokerAddresses.length; arrIdx++) {
+				LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBrokerAddresses[currentContactSeedBrokerIndex], retry, numRetries);
+
+				try {
+					// clear in case we have an incomplete list from previous tries
+					topics.clear();
+
+					for (TopicMetadata item : consumer.send(new TopicMetadataRequest(Collections.<String>emptyList())).topicsMetadata()) {
+						if (item.errorCode() != ErrorMapping.NoError()) {
+							// warn and try more brokers
+							LOG.warn("Error while getting metadata from broker {} to find partitions for {}. Error: {}.",
+								seedBrokerAddresses[currentContactSeedBrokerIndex], topics.toString(), ErrorMapping.exceptionFor(item.errorCode()).getMessage());
+
+							useNextAddressAsNewContactSeedBroker();
+							continue brokersLoop;
+						}
+
+						topics.add(item.topic());
+					}
+					break retryLoop; // leave the loop through the brokers
+				}
+				catch (Exception e) {
+					//validates seed brokers in case of a ClosedChannelException
+					validateSeedBrokers(seedBrokerAddresses, e);
+					LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}",
+						seedBrokerAddresses[currentContactSeedBrokerIndex], topics, e.getClass().getName(), e.getMessage());
+					LOG.debug("Detailed trace", e);
+
+					// we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
+					try {
+						Thread.sleep(500);
+					} catch (InterruptedException e1) {
+						// sleep shorter.
+					}
+
+					useNextAddressAsNewContactSeedBroker();
+				}
+			} // brokers loop
+		} // retries loop
+
+		return topics;
+	}
+
+	@Override
+	public List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) {
+		return KafkaTopicPartition.dropLeaderData(getPartitionLeadersForTopics(topics));
+	}
+
+	@Override
+	protected void wakeupConnections() {
+		// nothing to do, as Kafka 0.8's SimpleConsumer does not support wakeup
+	}
+
+	@Override
+	protected void closeConnections() throws Exception {
+		if (consumer != null) {
+			SimpleConsumer consumer = this.consumer;
+			consumer.close();
+
+			// de-reference the consumer to avoid closing multiple times
+			this.consumer = null;
+		}
+	}
+
+	/**
+	 * Send request to Kafka to get partitions for topics.
+	 *
+	 * @param topics The name of the topics.
+	 */
+	public List<KafkaTopicPartitionLeader> getPartitionLeadersForTopics(List<String> topics) {
+		List<KafkaTopicPartitionLeader> partitions = new LinkedList<>();
+
+		retryLoop: for (int retry = 0; retry < numRetries; retry++) {
+			brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokerAddresses.length; arrIdx++) {
+				LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBrokerAddresses[currentContactSeedBrokerIndex], retry, numRetries);
+
+				try {
+					// clear in case we have an incomplete list from previous tries
+					partitions.clear();
+
+					for (TopicMetadata item : consumer.send(new TopicMetadataRequest(topics)).topicsMetadata()) {
+						if (item.errorCode() != ErrorMapping.NoError()) {
+							// warn and try more brokers
+							LOG.warn("Error while getting metadata from broker {} to find partitions for {}. Error: {}.",
+								seedBrokerAddresses[currentContactSeedBrokerIndex], topics.toString(), ErrorMapping.exceptionFor(item.errorCode()).getMessage());
+
+							useNextAddressAsNewContactSeedBroker();
+							continue brokersLoop;
+						}
+
+						if (!topics.contains(item.topic())) {
+							LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
+
+							useNextAddressAsNewContactSeedBroker();
+							continue brokersLoop;
+						}
+
+						for (PartitionMetadata part : item.partitionsMetadata()) {
+							Node leader = brokerToNode(part.leader());
+							KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId());
+							KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
+							partitions.add(pInfo);
+						}
+					}
+					break retryLoop; // leave the loop through the brokers
+				}
+				catch (Exception e) {
+					//validates seed brokers in case of a ClosedChannelException
+					validateSeedBrokers(seedBrokerAddresses, e);
+					LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}",
+						seedBrokerAddresses[currentContactSeedBrokerIndex], topics, e.getClass().getName(), e.getMessage());
+					LOG.debug("Detailed trace", e);
+
+					// we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
+					try {
+						Thread.sleep(500);
+					} catch (InterruptedException e1) {
+						// sleep shorter.
+					}
+
+					useNextAddressAsNewContactSeedBroker();
+				}
+			} // brokers loop
+		} // retries loop
+
+		return partitions;
+	}
+
+	/**
+	 * Re-establish broker connection using the next available seed broker address.
+	 */
+	private void useNextAddressAsNewContactSeedBroker() {
+		if (++currentContactSeedBrokerIndex == seedBrokerAddresses.length) {
+			currentContactSeedBrokerIndex = 0;
+		}
+
+		URL newContactUrl = NetUtils.getCorrectHostnamePort(seedBrokerAddresses[currentContactSeedBrokerIndex]);
+		this.consumer = new SimpleConsumer(newContactUrl.getHost(), newContactUrl.getPort(), soTimeout, bufferSize, dummyClientId);
+	}
+
+	/**
+	 * Turn a broker instance into a node instance.
+	 *
+	 * @param broker broker instance
+	 * @return Node representing the given broker
+	 */
+	private static Node brokerToNode(Broker broker) {
+		return new Node(broker.id(), broker.host(), broker.port());
+	}
+
+	/**
+	 * Validate that at least one seed broker is valid in case of a
+	 * ClosedChannelException.
+	 *
+	 * @param seedBrokers
+	 *            array containing the seed brokers e.g. ["host1:port1",
+	 *            "host2:port2"]
+	 * @param exception
+	 *            instance
+	 */
+	private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
+		if (!(exception instanceof ClosedChannelException)) {
+			return;
+		}
+		int unknownHosts = 0;
+		for (String broker : seedBrokers) {
+			URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim());
+			try {
+				InetAddress.getByName(brokerUrl.getHost());
+			} catch (UnknownHostException e) {
+				unknownHosts++;
+			}
+		}
+		// throw meaningful exception if all the provided hosts are invalid
+		if (unknownHosts == seedBrokers.length) {
+			throw new IllegalArgumentException("All the servers provided in: '"
+				+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
index ecf1378..836ed6b 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
@@ -18,31 +18,37 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
-
 import java.util.List;
 import java.util.Properties;
 
 class PartitionInfoFetcher extends Thread {
 
 	private final List<String> topics;
-	private final Properties properties;
+	private final Kafka08PartitionDiscoverer partitionDiscoverer;
 
 	private volatile List<KafkaTopicPartitionLeader> result;
 	private volatile Throwable error;
 
 	PartitionInfoFetcher(List<String> topics, Properties properties) {
+		// we're only using partial functionality of the partition discoverer; the subtask id arguments doesn't matter
+		this.partitionDiscoverer = new Kafka08PartitionDiscoverer(new KafkaTopicsDescriptor(topics, null), 0, 1, properties);
 		this.topics = topics;
-		this.properties = properties;
 	}
 
 	@Override
 	public void run() {
 		try {
-			result = FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties);
+			partitionDiscoverer.open();
+			result = partitionDiscoverer.getPartitionLeadersForTopics(topics);
 		}
 		catch (Throwable t) {
 			this.error = t;
+		} finally {
+			try {
+				partitionDiscoverer.close();
+			} catch (Exception e) {
+				throw new RuntimeException("Error while closing partition discoverer.", e);
+			}
 		}
 	}
 
@@ -57,9 +63,11 @@ class PartitionInfoFetcher extends Thread {
 		if (error != null) {
 			throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error);
 		}
+
 		if (result != null) {
 			return result;
 		}
+
 		throw new Exception("Partition fetching failed");
 	}
 }