You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/16 18:09:55 UTC

[5/5] flink git commit: [FLINK-5048] [kafka consumer] Change thread model of FlinkKafkaConsumer to better handel shutdown/interrupt situations

[FLINK-5048] [kafka consumer] Change thread model of FlinkKafkaConsumer to better handel shutdown/interrupt situations

Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate thread that operates Kafka's consumer.
That thread ws shielded from interrupts, because the Kafka Consumer has not been handling thread interrupts well.
Since that thread was also the thread that emitted records, it would block in the network stack (backpressure) or in chained operators.
The later case lead to situations where cancellations got very slow unless that thread would be interrupted (which it could not be).

This commit changes the thread model:
  - A spawned consumer thread polls a batch or records from the KafkaConsumer and pushes the
    batch of records into a blocking queue (size one)
  - The main thread of the task will pull the record batches from the blocking queue and
    emit the records.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a66e7ad1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a66e7ad1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a66e7ad1

Branch: refs/heads/master
Commit: a66e7ad14e41fa07737f447d68920ad5cc4ed6d3
Parents: fa1864c
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 10 11:13:43 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 16 19:08:07 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |  20 +
 .../kafka/internal/Kafka010Fetcher.java         |   7 +-
 .../internal/KafkaConsumerCallBridge010.java    |  40 ++
 .../connectors/kafka/Kafka010FetcherTest.java   | 172 ++++++++-
 .../kafka/KafkaShortRetention010ITCase.java     |  34 --
 .../connectors/kafka/internal/Handover.java     | 214 ++++++++++
 .../kafka/internal/Kafka09Fetcher.java          | 274 +++----------
 .../kafka/internal/KafkaConsumerCallBridge.java |  41 ++
 .../kafka/internal/KafkaConsumerThread.java     | 332 ++++++++++++++++
 .../connectors/kafka/Kafka09FetcherTest.java    | 164 +++++++-
 .../kafka/KafkaShortRetention09ITCase.java      |  34 --
 .../connectors/kafka/internal/HandoverTest.java | 387 +++++++++++++++++++
 .../kafka/KafkaShortRetentionTestBase.java      |   1 +
 13 files changed, 1422 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index cc7f56f..32bc1d2 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -100,6 +100,26 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Throws the given {@code Throwable} in scenarios where the signatures do allow to
+	 * throw a Exception. Errors and Exceptions are thrown directly, other "exotic"
+	 * subclasses of Throwable are wrapped in an Exception.
+	 *
+	 * @param t The throwable to be thrown.
+	 * @param parentMessage The message for the parent Exception, if one is needed.
+	 */
+	public static void rethrowException(Throwable t, String parentMessage) throws Exception {
+		if (t instanceof Error) {
+			throw (Error) t;
+		}
+		else if (t instanceof Exception) {
+			throw (Exception) t;
+		}
+		else {
+			throw new Exception(parentMessage, t);
+		}
+	}
+
+	/**
 	 * Tries to throw the given {@code Throwable} in scenarios where the signatures allows only IOExceptions
 	 * (and RuntimeException and Error). Throws this exception directly, if it is an IOException,
 	 * a RuntimeException, or an Error. Otherwise does nothing.

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 024cd38..71dd29a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.List;
@@ -91,11 +90,11 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 
 	/**
 	 * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
-	 * changing the List in the signature to a Collection.
+	 * changing binary signatures
 	 */
 	@Override
-	protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) {
-		consumer.assign(topicPartitions);
+	protected KafkaConsumerCallBridge010 createCallBridge() {
+		return new KafkaConsumerCallBridge010();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
new file mode 100644
index 0000000..a81b098
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls the {@link KafkaConsumer#assign(java.util.Collection)} method.
+ * 
+ * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
+ * changing {@code assign(List)} to {@code assign(Collection)}.
+ * 
+ * Because of that, we need two versions whose compiled code goes against different method signatures.
+ */
+public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
+
+	@Override
+	public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
+		consumer.assign(topicPartitions);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 037d25b..6ee0429 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -20,16 +20,20 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Handover;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -45,6 +49,7 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -54,6 +59,7 @@ import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -69,7 +75,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
  * Unit tests for the {@link Kafka010Fetcher}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(Kafka010Fetcher.class)
+@PrepareForTest(KafkaConsumerThread.class)
 public class Kafka010FetcherTest {
 
     @Test
@@ -125,7 +131,7 @@ public class Kafka010FetcherTest {
                 getClass().getClassLoader(),
                 false, /* checkpointing */
                 "taskname-with-subtask",
-                mock(MetricGroup.class),
+                new UnregisteredMetricsGroup(),
                 schema,
                 new Properties(),
                 0L,
@@ -174,9 +180,13 @@ public class Kafka010FetcherTest {
         fetcherRunner.join();
 
         // check that there were no errors in the fetcher
-        final Throwable caughtError = error.get();
-        if (caughtError != null) {
-            throw new Exception("Exception in the fetcher", caughtError);
+        final Throwable fetcherError = error.get();
+        if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
+            throw new Exception("Exception in the fetcher", fetcherError);
+        }
+        final Throwable committerError = commitError.get();
+        if (committerError != null) {
+            throw new Exception("Exception in the committer", committerError);
         }
     }
 
@@ -258,7 +268,7 @@ public class Kafka010FetcherTest {
                 getClass().getClassLoader(),
                 false, /* checkpointing */
                 "taskname-with-subtask",
-                mock(MetricGroup.class),
+                new UnregisteredMetricsGroup(),
                 schema,
                 new Properties(),
                 0L,
@@ -321,8 +331,154 @@ public class Kafka010FetcherTest {
 
         // check that there were no errors in the fetcher
         final Throwable caughtError = error.get();
-        if (caughtError != null) {
+        if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
             throw new Exception("Exception in the fetcher", caughtError);
         }
     }
+
+    @Test
+    public void testCancellationWhenEmitBlocks() throws Exception {
+
+        // ----- some test data -----
+
+        final String topic = "test-topic";
+        final int partition = 3;
+        final byte[] payload = new byte[] {1, 2, 3, 4};
+
+        final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+                new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
+                new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
+                new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
+        data.put(new TopicPartition(topic, partition), records);
+
+        final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
+
+        // ----- the test consumer -----
+
+        final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+        when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+            @Override
+            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+                return consumerRecords;
+            }
+        });
+
+        whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+        // ----- build a fetcher -----
+
+        BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
+        List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+        KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+                sourceContext,
+                topics,
+                null, /* periodic watermark extractor */
+                null, /* punctuated watermark extractor */
+                new TestProcessingTimeService(),
+                10, /* watermark interval */
+                this.getClass().getClassLoader(),
+                true, /* checkpointing */
+                "task_name",
+                new UnregisteredMetricsGroup(),
+                schema,
+                new Properties(),
+                0L,
+                false);
+
+
+        // ----- run the fetcher -----
+
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final Thread fetcherRunner = new Thread("fetcher runner") {
+
+            @Override
+            public void run() {
+                try {
+                    fetcher.runFetchLoop();
+                } catch (Throwable t) {
+                    error.set(t);
+                }
+            }
+        };
+        fetcherRunner.start();
+
+        // wait until the thread started to emit records to the source context
+        sourceContext.waitTillHasBlocker();
+
+        // now we try to cancel the fetcher, including the interruption usually done on the task thread
+        // once it has finished, there must be no more thread blocked on the source context
+        fetcher.cancel();
+        fetcherRunner.interrupt();
+        fetcherRunner.join();
+
+        assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
+    }
+
+    // ------------------------------------------------------------------------
+    //  test utilities
+    // ------------------------------------------------------------------------
+
+    private static final class BlockingSourceContext<T> implements SourceContext<T> {
+
+        private final ReentrantLock lock = new ReentrantLock();
+        private final OneShotLatch inBlocking = new OneShotLatch();
+
+        @Override
+        public void collect(T element) {
+            block();
+        }
+
+        @Override
+        public void collectWithTimestamp(T element, long timestamp) {
+            block();
+        }
+
+        @Override
+        public void emitWatermark(Watermark mark) {
+            block();
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return new Object();
+        }
+
+        @Override
+        public void close() {}
+
+        public void waitTillHasBlocker() throws InterruptedException {
+            inBlocking.await();
+        }
+
+        public boolean isStillBlocking() {
+            return lock.isLocked();
+        }
+
+        @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
+        private void block() {
+            lock.lock();
+            try {
+                inBlocking.trigger();
+
+                // put this thread to sleep indefinitely
+                final Object o = new Object();
+                while (true) {
+                    synchronized (o) {
+                        o.wait();
+                    }
+                }
+            }
+            catch (InterruptedException e) {
+                // exit cleanly, simply reset the interruption flag
+                Thread.currentThread().interrupt();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
deleted file mode 100644
index 1d36198..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class KafkaShortRetention010ITCase extends KafkaShortRetentionTestBase {
-
-	@Test(timeout=60000)
-	public void testAutoOffsetReset() throws Exception {
-		runAutoOffsetResetTest();
-	}
-
-	@Test(timeout=60000)
-	public void testAutoOffsetResetNone() throws Exception {
-		runFailOnAutoOffsetResetNone();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
new file mode 100644
index 0000000..e6e3c51
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Handover is a utility to hand over data (a buffer of records) and exception from a
+ * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a
+ * "size one blocking queue", with some extras around exception reporting, closing, and
+ * waking up thread without {@link Thread#interrupt() interrupting} threads.
+ * 
+ * <p>This class is used in the Flink Kafka Consumer to hand over data and exceptions between
+ * the thread that runs the KafkaConsumer class and the main thread.
+ * 
+ * <p>The Handover has the notion of "waking up" the producer thread with a {@link WakeupException}
+ * rather than a thread interrupt.
+ * 
+ * <p>The Handover can also be "closed", signalling from one thread to the other that it
+ * the thread has terminated.
+ */
+@ThreadSafe
+public final class Handover implements Closeable {
+
+	private final Object lock = new Object();
+
+	private ConsumerRecords<byte[], byte[]> next;
+	private Throwable error;
+	private boolean wakeupProducer;
+
+	/**
+	 * Polls the next element from the Handover, possibly blocking until the next element is
+	 * available. This method behaves similar to polling from a blocking queue.
+	 * 
+	 * <p>If an exception was handed in by the producer ({@link #reportError(Throwable)}), then
+	 * that exception is thrown rather than an element being returned.
+	 * 
+	 * @return The next element (buffer of records, never null).
+	 * 
+	 * @throws ClosedException Thrown if the Handover was {@link #close() closed}.
+	 * @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method.
+	 */
+	@Nonnull
+	public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
+		synchronized (lock) {
+			while (next == null && error == null) {
+				lock.wait();
+			}
+
+			ConsumerRecords<byte[], byte[]> n = next;
+			if (n != null) {
+				next = null;
+				lock.notifyAll();
+				return n;
+			}
+			else {
+				ExceptionUtils.rethrowException(error, error.getMessage());
+
+				// this statement cannot be reached since the above method always throws an exception
+				// this is only here to silence the compiler and any warnings
+				return ConsumerRecords.empty(); 
+			}
+		}
+	}
+
+	/**
+	 * Hands over an element from the producer. If the Handover already has an element that was
+	 * not yet picked up by the consumer thread, this call blocks until the consumer picks up that
+	 * previous element.
+	 * 
+	 * <p>This behavior is similar to a "size one" blocking queue.
+	 * 
+	 * @param element The next element to hand over.
+	 * 
+	 * @throws InterruptedException
+	 *                 Thrown, if the thread is interrupted while blocking for the Handover to be empty.
+	 * @throws WakeupException
+	 *                 Thrown, if the {@link #wakeupProducer()} method is called while blocking for
+	 *                 the Handover to be empty.
+	 * @throws ClosedException
+	 *                 Thrown if the Handover was closed or concurrently being closed.
+	 */
+	public void produce(final ConsumerRecords<byte[], byte[]> element)
+			throws InterruptedException, WakeupException, ClosedException {
+
+		checkNotNull(element);
+
+		synchronized (lock) {
+			while (next != null && !wakeupProducer) {
+				lock.wait();
+			}
+
+			wakeupProducer = false;
+
+			// if there is still an element, we must have been woken up
+			if (next != null) {
+				throw new WakeupException();
+			}
+			// if there is no error, then this is open and can accept this element
+			else if (error == null) {
+				next = element;
+				lock.notifyAll();
+			}
+			// an error marks this as closed for the producer
+			else {
+				throw new ClosedException();
+			}
+		}
+	}
+
+	/**
+	 * Reports an exception. The consumer will throw the given exception immediately, if
+	 * it is currently blocked in the {@link #pollNext()} method, or the next time it
+	 * calls that method.
+	 * 
+	 * <p>After this method has been called, no call to either {@link #produce(ConsumerRecords)}
+	 * or {@link #pollNext()} will ever return regularly any more, but will always return
+	 * exceptionally.
+	 * 
+	 * <p>If another exception was already reported, this method does nothing.
+	 * 
+	 * <p>For the producer, the Handover will appear as if it was {@link #close() closed}.
+	 * 
+	 * @param t The exception to report.
+	 */
+	public void reportError(Throwable t) {
+		checkNotNull(t);
+
+		synchronized (lock) {
+			// do not override the initial exception
+			if (error == null) {
+				error = t;
+			}
+			next = null;
+			lock.notifyAll();
+		}
+	}
+
+	/**
+	 * Closes the handover. Both the {@link #produce(ConsumerRecords)} method and the
+	 * {@link #pollNext()} will throw a {@link ClosedException} on any currently blocking and
+	 * future invocations.
+	 * 
+	 * <p>If an exception was previously reported via the {@link #reportError(Throwable)} method,
+	 * that exception will not be overridden. The consumer thread will throw that exception upon
+	 * calling {@link #pollNext()}, rather than the {@code ClosedException}.
+	 */
+	@Override
+	public void close() {
+		synchronized (lock) {
+			next = null;
+			wakeupProducer = false;
+
+			if (error == null) {
+				error = new ClosedException();
+			}
+			lock.notifyAll();
+		}
+	}
+
+	/**
+	 * Wakes the producer thread up. If the producer thread is currently blocked in
+	 * the {@link #produce(ConsumerRecords)} method, it will exit the method throwing
+	 * a {@link WakeupException}.
+	 */
+	public void wakeupProducer() {
+		synchronized (lock) {
+			wakeupProducer = true;
+			lock.notifyAll();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * An exception thrown by the Handover in the {@link #pollNext()} or
+	 * {@link #produce(ConsumerRecords)} method, after the Handover was closed via
+	 * {@link #close()}.
+	 */
+	public static final class ClosedException extends Exception {
+		private static final long serialVersionUID = 1L;
+	}
+
+	/**
+	 * A special exception thrown bv the Handover in the {@link #produce(ConsumerRecords)}
+	 * method when the producer is woken up from a blocking call via {@link #wakeupProducer()}.
+	 */
+	public static final class WakeupException extends Exception {
+		private static final long serialVersionUID = 1L;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index acdcb61..d495327 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -23,10 +23,8 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
@@ -34,30 +32,23 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
  * 
  * @param <T> The type of elements produced by the fetcher.
  */
-public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implements Runnable {
+public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(Kafka09Fetcher.class);
 
@@ -66,36 +57,15 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 	/** The schema to convert between Kafka's byte messages, and Flink's objects */
 	private final KeyedDeserializationSchema<T> deserializer;
 
-	/** The configuration for the Kafka consumer */
-	private final Properties kafkaProperties;
+	/** The handover of data and exceptions between the consumer thread and the task thread */
+	private final Handover handover;
 
-	/** The maximum number of milliseconds to wait for a fetch batch */
-	private final long pollTimeout;
-
-	/** The next offsets that the main thread should commit */
-	private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
-	
-	/** The callback invoked by Kafka once an offset commit is complete */
-	private final OffsetCommitCallback offsetCommitCallback;
-
-	/** Reference to the Kafka consumer, once it is created */
-	private volatile KafkaConsumer<byte[], byte[]> consumer;
-	
-	/** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */
-	private volatile ExceptionProxy errorHandler;
+	/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher */
+	private final KafkaConsumerThread consumerThread;
 
 	/** Flag to mark the main work loop as alive */
 	private volatile boolean running = true;
 
-	/** Flag tracking whether the latest commit request has completed */
-	private volatile boolean commitInProgress;
-
-	/** For Debug output **/
-	private String taskNameWithSubtasks;
-
-	/** We get this from the outside to publish metrics. **/
-	private MetricGroup metricGroup;
-
 	// ------------------------------------------------------------------------
 
 	public Kafka09Fetcher(
@@ -125,16 +95,26 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 				useMetrics);
 
 		this.deserializer = deserializer;
-		this.kafkaProperties = kafkaProperties;
-		this.pollTimeout = pollTimeout;
-		this.nextOffsetsToCommit = new AtomicReference<>();
-		this.offsetCommitCallback = new CommitCallback();
-		this.taskNameWithSubtasks = taskNameWithSubtasks;
-		this.metricGroup = metricGroup;
+		this.handover = new Handover();
+
+		final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
+		addOffsetStateGauge(kafkaMetricGroup);
 
 		// if checkpointing is enabled, we are not automatically committing to Kafka.
-		kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+		kafkaProperties.setProperty(
+				ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
 				Boolean.toString(!enableCheckpointing));
+		
+		this.consumerThread = new KafkaConsumerThread(
+				LOG,
+				handover,
+				kafkaProperties,
+				subscribedPartitions(),
+				kafkaMetricGroup,
+				createCallBridge(),
+				getFetcherName() + " for " + taskNameWithSubtasks,
+				pollTimeout,
+				useMetrics);
 	}
 
 	// ------------------------------------------------------------------------
@@ -143,133 +123,26 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 
 	@Override
 	public void runFetchLoop() throws Exception {
-		this.errorHandler = new ExceptionProxy(Thread.currentThread());
-
-		// rather than running the main fetch loop directly here, we spawn a dedicated thread
-		// this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code
-		Thread runner = new Thread(this, getFetcherName() + " for " + taskNameWithSubtasks);
-		runner.setDaemon(true);
-		runner.start();
-
 		try {
-			runner.join();
-		} catch (InterruptedException e) {
-			// may be the result of a wake-up after an exception. we ignore this here and only
-			// restore the interruption state
-			Thread.currentThread().interrupt();
-		}
-
-		// make sure we propagate any exception that occurred in the concurrent fetch thread,
-		// before leaving this method
-		this.errorHandler.checkAndThrowException();
-	}
-
-	@Override
-	public void cancel() {
-		// flag the main thread to exit
-		running = false;
-
-		// NOTE:
-		//   - We cannot interrupt the runner thread, because the Kafka consumer may
-		//     deadlock when the thread is interrupted while in certain methods
-		//   - We cannot call close() on the consumer, because it will actually throw
-		//     an exception if a concurrent call is in progress
-
-		// make sure the consumer finds out faster that we are shutting down 
-		if (consumer != null) {
-			consumer.wakeup();
-		}
-	}
-
-	@Override
-	public void run() {
-		// This method initializes the KafkaConsumer and guarantees it is torn down properly.
-		// This is important, because the consumer has multi-threading issues,
-		// including concurrent 'close()' calls.
-
-		final KafkaConsumer<byte[], byte[]> consumer;
-		try {
-			consumer = new KafkaConsumer<>(kafkaProperties);
-		}
-		catch (Throwable t) {
-			running = false;
-			errorHandler.reportError(t);
-			return;
-		}
-
-		// from here on, the consumer will be closed properly
-		try {
-			assignPartitionsToConsumer(consumer, convertKafkaPartitions(subscribedPartitions()));
-
-			if (useMetrics) {
-				final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
-				addOffsetStateGauge(kafkaMetricGroup);
-				// register Kafka metrics to Flink
-				Map<MetricName, ? extends Metric> metrics = consumer.metrics();
-				if (metrics == null) {
-					// MapR's Kafka implementation returns null here.
-					LOG.info("Consumer implementation does not support metrics");
-				} else {
-					// we have Kafka metrics, register them
-					for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
-						kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
-					}
-				}
-			}
-
-			// seek the consumer to the initial offsets
-			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
-				if (partition.isOffsetDefined()) {
-					LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
-						"to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
-
-					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
-				} else {
-					// for partitions that do not have offsets restored from a checkpoint/savepoint,
-					// we need to define our internal offset state for them using the initial offsets retrieved from Kafka
-					// by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
-
-					long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
-
-					LOG.info("Partition {} has no initial offset; the consumer has position {}, so the initial offset " +
-						"will be set to {}", partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
-
-					// the fetched offset represents the next record to process, so we need to subtract it by 1
-					partition.setOffset(fetchedOffset - 1);
-				}
-			}
+			final Handover handover = this.handover;
 
-			// from now on, external operations may call the consumer
-			this.consumer = consumer;
+			// kick off the actual Kafka consumer
+			consumerThread.start();
 
-			// main fetch loop
 			while (running) {
-
-				// check if there is something to commit
-				final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
-				if (toCommit != null && !commitInProgress) {
-					// reset the work-to-be committed, so we don't repeatedly commit the same
-					// also record that a commit is already in progress
-					commitInProgress = true;
-					consumer.commitAsync(toCommit, offsetCommitCallback);
-				}
-
-				// get the next batch of records
-				final ConsumerRecords<byte[], byte[]> records;
-				try {
-					records = consumer.poll(pollTimeout);
-				}
-				catch (WakeupException we) {
-					continue;
-				}
+				// this blocks until we get the next records
+				// it automatically re-throws exceptions encountered in the fetcher thread
+				final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
 
 				// get the records for each topic partition
 				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
-					
-					List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle());
+
+					List<ConsumerRecord<byte[], byte[]>> partitionRecords =
+							records.records(partition.getKafkaPartitionHandle());
 
 					for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
-						T value = deserializer.deserialize(
+
+						final T value = deserializer.deserialize(
 								record.key(), record.value(),
 								record.topic(), record.partition(), record.offset());
 
@@ -279,32 +152,37 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 							break;
 						}
 
-						// emit the actual record. this also update offset state atomically
+						// emit the actual record. this also updates offset state atomically
 						// and deals with timestamps and watermark generation
 						emitRecord(value, partition, record.offset(), record);
 					}
 				}
 			}
-			// end main fetch loop
-		}
-		catch (Throwable t) {
-			if (running) {
-				running = false;
-				errorHandler.reportError(t);
-			} else {
-				LOG.debug("Stopped ConsumerThread threw exception", t);
-			}
 		}
 		finally {
-			try {
-				consumer.close();
-			}
-			catch (Throwable t) {
-				LOG.warn("Error while closing Kafka 0.9 consumer", t);
-			}
+			// this signals the consumer thread that no more work is to be done
+			consumerThread.shutdown();
+		}
+
+		// on a clean exit, wait for the runner thread
+		try {
+			consumerThread.join();
+		}
+		catch (InterruptedException e) {
+			// may be the result of a wake-up interruption after an exception.
+			// we ignore this here and only restore the interruption state
+			Thread.currentThread().interrupt();
 		}
 	}
 
+	@Override
+	public void cancel() {
+		// flag the main thread to exit. A thread interrupt will come anyways.
+		running = false;
+		handover.close();
+		consumerThread.shutdown();
+	}
+
 	// ------------------------------------------------------------------------
 	//  The below methods are overridden in the 0.10 fetcher, which otherwise
 	//   reuses most of the 0.9 fetcher behavior
@@ -320,14 +198,17 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		emitRecord(record, partition, offset);
 	}
 
-	protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) {
-		consumer.assign(topicPartitions);
-	}
-
+	/**
+	 * Gets the name of this fetcher, for thread naming and logging purposes.
+	 */
 	protected String getFetcherName() {
 		return "Kafka 0.9 Fetcher";
 	}
 
+	protected KafkaConsumerCallBridge createCallBridge() {
+		return new KafkaConsumerCallBridge();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Implement Methods of the AbstractFetcher
 	// ------------------------------------------------------------------------
@@ -355,37 +236,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		}
 
 		// record the work to be committed by the main consumer thread and make sure the consumer notices that
-		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
-			LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
-					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
-					"This does not compromise Flink's checkpoint integrity.");
-		}
-		if (consumer != null) {
-			consumer.wakeup();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	public static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
-		ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
-		for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
-			result.add(p.getKafkaPartitionHandle());
-		}
-		return result;
-	}
-
-	private class CommitCallback implements OffsetCommitCallback {
-
-		@Override
-		public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
-			commitInProgress = false;
-
-			if (ex != null) {
-				LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
-			}
-		}
+		consumerThread.setOffsetsToCommit(offsetsToCommit);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
new file mode 100644
index 0000000..c17aae6
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}.
+ * 
+ * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
+ * for example changing {@code assign(List)} to {@code assign(Collection)}.
+ * 
+ * Because of that, we need to two versions whose compiled code goes against different method signatures.
+ * Even though the source of subclasses may look identical, the byte code will be different, because they
+ * are compiled against different dependencies.
+ */
+public class KafkaConsumerCallBridge {
+
+	public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
+		consumer.assign(topicPartitions);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
new file mode 100644
index 0000000..9cfa840
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the brokers and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by the fetcher that will
+ * deserialize and emit the records.
+ * 
+ * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, and to even
+ * deadlock in certain situations.
+ * 
+ * <p>Implementation Note: This code is written to be reusable in later versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+	/** Logger for this consumer */
+	private final Logger log;
+
+	/** The handover of data and exceptions between the consumer thread and the task thread */
+	private final Handover handover;
+
+	/** The next offsets that the main thread should commit */
+	private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
+
+	/** The configuration for the Kafka consumer */
+	private final Properties kafkaProperties;
+
+	/** The partitions that this consumer reads from */ 
+	private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions;
+
+	/** We get this from the outside to publish metrics. **/
+	private final MetricGroup kafkaMetricGroup;
+
+	/** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken */
+	private final KafkaConsumerCallBridge consumerCallBridge;
+
+	/** The maximum number of milliseconds to wait for a fetch batch */
+	private final long pollTimeout;
+
+	/** Flag whether to add Kafka's metrics to the Flink metrics */
+	private final boolean useMetrics;
+
+	/** Reference to the Kafka consumer, once it is created */
+	private volatile KafkaConsumer<byte[], byte[]> consumer;
+
+	/** Flag to mark the main work loop as alive */
+	private volatile boolean running;
+
+	/** Flag tracking whether the latest commit request has completed */
+	private volatile boolean commitInProgress;
+
+
+	public KafkaConsumerThread(
+			Logger log,
+			Handover handover,
+			Properties kafkaProperties,
+			KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions,
+			MetricGroup kafkaMetricGroup,
+			KafkaConsumerCallBridge consumerCallBridge,
+			String threadName,
+			long pollTimeout,
+			boolean useMetrics) {
+
+		super(threadName);
+		setDaemon(true);
+
+		this.log = checkNotNull(log);
+		this.handover = checkNotNull(handover);
+		this.kafkaProperties = checkNotNull(kafkaProperties);
+		this.subscribedPartitions = checkNotNull(subscribedPartitions);
+		this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
+		this.consumerCallBridge = checkNotNull(consumerCallBridge);
+		this.pollTimeout = pollTimeout;
+		this.useMetrics = useMetrics;
+
+		this.nextOffsetsToCommit = new AtomicReference<>();
+		this.running = true;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void run() {
+		// early exit check
+		if (!running) {
+			return;
+		}
+
+		// this is the means to talk to FlinkKafkaConsumer's main thread
+		final Handover handover = this.handover;
+
+		// This method initializes the KafkaConsumer and guarantees it is torn down properly.
+		// This is important, because the consumer has multi-threading issues,
+		// including concurrent 'close()' calls.
+		final KafkaConsumer<byte[], byte[]> consumer;
+		try {
+			consumer = new KafkaConsumer<>(kafkaProperties);
+		}
+		catch (Throwable t) {
+			handover.reportError(t);
+			return;
+		}
+
+		// from here on, the consumer is guaranteed to be closed properly
+		try {
+			// The callback invoked by Kafka once an offset commit is complete
+			final OffsetCommitCallback offsetCommitCallback = new CommitCallback();
+
+			// tell the consumer which partitions to work with
+			consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions));
+
+			// register Kafka's very own metrics in Flink's metric reporters
+			if (useMetrics) {
+				// register Kafka metrics to Flink
+				Map<MetricName, ? extends Metric> metrics = consumer.metrics();
+				if (metrics == null) {
+					// MapR's Kafka implementation returns null here.
+					log.info("Consumer implementation does not support metrics");
+				} else {
+					// we have Kafka metrics, register them
+					for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
+						kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
+					}
+				}
+			}
+
+			// early exit check
+			if (!running) {
+				return;
+			}
+
+			// seek the consumer to the initial offsets
+			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
+				if (partition.isOffsetDefined()) {
+					log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; " +
+							"seeking the consumer to position {}",
+							partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
+
+					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+				}
+				else {
+					// for partitions that do not have offsets restored from a checkpoint/savepoint,
+					// we need to define our internal offset state for them using the initial offsets retrieved from Kafka
+					// by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
+
+					long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
+
+					log.info("Partition {} has no initial offset; the consumer has position {}, " +
+							"so the initial offset will be set to {}",
+							partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
+
+					// the fetched offset represents the next record to process, so we need to subtract it by 1
+					partition.setOffset(fetchedOffset - 1);
+				}
+			}
+
+			// from now on, external operations may call the consumer
+			this.consumer = consumer;
+
+			// the latest bulk of records. may carry across the loop if the thread is woken up
+			// from blocking on the handover
+			ConsumerRecords<byte[], byte[]> records = null;
+
+			// main fetch loop
+			while (running) {
+
+				// check if there is something to commit
+				if (!commitInProgress) {
+					// get and reset the work-to-be committed, so we don't repeatedly commit the same
+					final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
+
+					if (toCommit != null) {
+						log.debug("Sending async offset commit request to Kafka broker");
+
+						// also record that a commit is already in progress
+						// the order here matters! first set the flag, then send the commit command.
+						commitInProgress = true;
+						consumer.commitAsync(toCommit, offsetCommitCallback);
+					}
+				}
+
+				// get the next batch of records, unless we did not manage to hand the old batch over
+				if (records == null) {
+					try {
+						records = consumer.poll(pollTimeout);
+					}
+					catch (WakeupException we) {
+						continue;
+					}
+				}
+
+				try {
+					handover.produce(records);
+					records = null;
+				}
+				catch (Handover.WakeupException e) {
+					// fall through the loop
+				}
+			}
+			// end main fetch loop
+		}
+		catch (Throwable t) {
+			// let the main thread know and exit
+			// it may be that this exception comes because the main thread closed the handover, in
+			// which case the below reporting is irrelevant, but does not hurt either
+			handover.reportError(t);
+		}
+		finally {
+			// make sure the handover is closed if it is not already closed or has an error
+			handover.close();
+
+			// make sure the KafkaConsumer is closed
+			try {
+				consumer.close();
+			}
+			catch (Throwable t) {
+				log.warn("Error while closing Kafka consumer", t);
+			}
+		}
+	}
+
+	/**
+	 * Shuts this thread down, waking up the thread gracefully if blocked (without Thread.interrupt() calls).
+	 */
+	public void shutdown() {
+		running = false;
+
+		// We cannot call close() on the KafkaConsumer, because it will actually throw
+		// an exception if a concurrent call is in progress
+
+		// this wakes up the consumer if it is blocked handing over records
+		handover.wakeupProducer();
+
+		// this wakes up the consumer if it is blocked in a kafka poll 
+		if (consumer != null) {
+			consumer.wakeup();
+		}
+	}
+
+	/**
+	 * Tells this thread to commit a set of offsets. This method does not block, the committing
+	 * operation will happen asynchronously.
+	 * 
+	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
+	 * the frequency with which this method is called, then some commits may be skipped due to being
+	 * superseded  by newer ones.
+	 * 
+	 * @param offsetsToCommit The offsets to commit
+	 */
+	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
+		// record the work to be committed by the main consumer thread and make sure the consumer notices that
+		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
+			log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
+					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
+					"This does not compromise Flink's checkpoint integrity.");
+		}
+
+		// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
+		handover.wakeupProducer();
+		if (consumer != null) {
+			consumer.wakeup();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
+		ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
+		for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
+			result.add(p.getKafkaPartitionHandle());
+		}
+		return result;
+	}
+
+	// ------------------------------------------------------------------------
+
+	private class CommitCallback implements OffsetCommitCallback {
+
+		@Override
+		public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
+			commitInProgress = false;
+
+			if (ex != null) {
+				log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 1162599..7a82365 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Handover;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -44,6 +48,7 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -53,6 +58,7 @@ import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -68,7 +74,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
  * Unit tests for the {@link Kafka09Fetcher}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(Kafka09Fetcher.class)
+@PrepareForTest(KafkaConsumerThread.class)
 public class Kafka09FetcherTest {
 
 	@Test
@@ -124,7 +130,7 @@ public class Kafka09FetcherTest {
 				this.getClass().getClassLoader(),
 				true, /* checkpointing */
 				"task_name",
-				mock(MetricGroup.class),
+				new UnregisteredMetricsGroup(),
 				schema,
 				new Properties(),
 				0L,
@@ -174,7 +180,7 @@ public class Kafka09FetcherTest {
 
 		// check that there were no errors in the fetcher
 		final Throwable fetcherError = error.get();
-		if (fetcherError != null) {
+		if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
 			throw new Exception("Exception in the fetcher", fetcherError);
 		}
 		final Throwable committerError = commitError.get();
@@ -260,7 +266,7 @@ public class Kafka09FetcherTest {
 				this.getClass().getClassLoader(),
 				true, /* checkpointing */
 				"task_name",
-				mock(MetricGroup.class),
+				new UnregisteredMetricsGroup(),
 				schema,
 				new Properties(),
 				0L,
@@ -323,8 +329,154 @@ public class Kafka09FetcherTest {
 
 		// check that there were no errors in the fetcher
 		final Throwable caughtError = error.get();
-		if (caughtError != null) {
+		if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
 			throw new Exception("Exception in the fetcher", caughtError);
 		}
 	}
+
+	@Test
+	public void testCancellationWhenEmitBlocks() throws Exception {
+
+		// ----- some test data -----
+
+		final String topic = "test-topic";
+		final int partition = 3;
+		final byte[] payload = new byte[] {1, 2, 3, 4};
+
+		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+
+		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
+		data.put(new TopicPartition(topic, partition), records);
+
+		final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
+
+		// ----- the test consumer -----
+
+		final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+				return consumerRecords;
+			}
+		});
+
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- build a fetcher -----
+
+		BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
+		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+				sourceContext,
+				topics,
+				null, /* periodic watermark extractor */
+				null, /* punctuated watermark extractor */
+				new TestProcessingTimeService(),
+				10, /* watermark interval */
+				this.getClass().getClassLoader(),
+				true, /* checkpointing */
+				"task_name",
+				new UnregisteredMetricsGroup(),
+				schema,
+				new Properties(),
+				0L,
+				false);
+
+
+		// ----- run the fetcher -----
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
+
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
+
+		// wait until the thread started to emit records to the source context
+		sourceContext.waitTillHasBlocker();
+
+		// now we try to cancel the fetcher, including the interruption usually done on the task thread
+		// once it has finished, there must be no more thread blocked on the source context
+		fetcher.cancel();
+		fetcherRunner.interrupt();
+		fetcherRunner.join();
+
+		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
+	}
+
+	// ------------------------------------------------------------------------
+	//  test utilities
+	// ------------------------------------------------------------------------
+
+	private static final class BlockingSourceContext<T> implements SourceContext<T> {
+
+		private final ReentrantLock lock = new ReentrantLock();
+		private final OneShotLatch inBlocking = new OneShotLatch();
+
+		@Override
+		public void collect(T element) {
+			block();
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			block();
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			block();
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return new Object();
+		}
+
+		@Override
+		public void close() {}
+
+		public void waitTillHasBlocker() throws InterruptedException {
+			inBlocking.await();
+		}
+
+		public boolean isStillBlocking() {
+			return lock.isLocked();
+		}
+
+		@SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
+		private void block() {
+			lock.lock();
+			try {
+				inBlocking.trigger();
+
+				// put this thread to sleep indefinitely
+				final Object o = new Object();
+				while (true) {
+					synchronized (o) {
+						o.wait();
+					}
+				}
+			}
+			catch (InterruptedException e) {
+				// exit cleanly, simply reset the interruption flag
+				Thread.currentThread().interrupt();
+			}
+			finally {
+				lock.unlock();
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
deleted file mode 100644
index c1b21b7..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class KafkaShortRetention09ITCase extends KafkaShortRetentionTestBase {
-
-	@Test(timeout=60000)
-	public void testAutoOffsetReset() throws Exception {
-		runAutoOffsetResetTest();
-	}
-
-	@Test(timeout=60000)
-	public void testAutoOffsetResetNone() throws Exception {
-		runFailOnAutoOffsetResetNone();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
new file mode 100644
index 0000000..25040eb
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link Handover} between Kafka Consumer Thread and the fetcher's main thread. 
+ */
+public class HandoverTest {
+
+	// ------------------------------------------------------------------------
+	//  test produce / consumer
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testWithVariableProducer() throws Exception {
+		runProducerConsumerTest(500, 2, 0);
+	}
+
+	@Test
+	public void testWithVariableConsumer() throws Exception {
+		runProducerConsumerTest(500, 0, 2);
+	}
+
+	@Test
+	public void testWithVariableBoth() throws Exception {
+		runProducerConsumerTest(500, 2, 2);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test error propagation
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testPublishErrorOnEmptyHandover() throws Exception {
+		final Handover handover = new Handover();
+
+		Exception error = new Exception();
+		handover.reportError(error);
+
+		try {
+			handover.pollNext();
+			fail("should throw an exception");
+		}
+		catch (Exception e) {
+			assertEquals(error, e);
+		}
+	}
+
+	@Test
+	public void testPublishErrorOnFullHandover() throws Exception {
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+
+		IOException error = new IOException();
+		handover.reportError(error);
+
+		try {
+			handover.pollNext();
+			fail("should throw an exception");
+		}
+		catch (Exception e) {
+			assertEquals(error, e);
+		}
+	}
+
+	@Test
+	public void testExceptionMarksClosedOnEmpty() throws Exception {
+		final Handover handover = new Handover();
+
+		IllegalStateException error = new IllegalStateException();
+		handover.reportError(error);
+
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testExceptionMarksClosedOnFull() throws Exception {
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+
+		LinkageError error = new LinkageError();
+		handover.reportError(error);
+
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test closing behavior
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testCloseEmptyForConsumer() throws Exception {
+		final Handover handover = new Handover();
+		handover.close();
+
+		try {
+			handover.pollNext();
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCloseFullForConsumer() throws Exception {
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+		handover.close();
+
+		try {
+			handover.pollNext();
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCloseEmptyForProducer() throws Exception {
+		final Handover handover = new Handover();
+		handover.close();
+
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCloseFullForProducer() throws Exception {
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+		handover.close();
+
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test wake up behavior
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testWakeupDoesNotWakeWhenEmpty() throws Exception {
+		Handover handover = new Handover();
+		handover.wakeupProducer();
+
+		// produce into a woken but empty handover
+		try {
+			handover.produce(createTestRecords());
+		}
+		catch (Handover.WakeupException e) {
+			fail();
+		}
+
+		// handover now has records, next time we wakeup and produce it needs
+		// to throw an exception
+		handover.wakeupProducer();
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.WakeupException e) {
+			// expected
+		}
+
+		// empty the handover
+		assertNotNull(handover.pollNext());
+		
+		// producing into an empty handover should work
+		try {
+			handover.produce(createTestRecords());
+		}
+		catch (Handover.WakeupException e) {
+			fail();
+		}
+	}
+
+	@Test
+	public void testWakeupWakesOnlyOnce() throws Exception {
+		// create a full handover
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+
+		handover.wakeupProducer();
+
+		try {
+			handover.produce(createTestRecords());
+			fail();
+		} catch (WakeupException e) {
+			// expected
+		}
+
+		CheckedThread producer = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				handover.produce(createTestRecords());
+			}
+		};
+		producer.start();
+
+		// the producer must go blocking
+		producer.waitUntilThreadHoldsLock(10000);
+
+		// release the thread by consuming something
+		assertNotNull(handover.pollNext());
+		producer.sync();
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private void runProducerConsumerTest(int numRecords, int maxProducerDelay, int maxConsumerDelay) throws Exception {
+		// generate test data
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		final ConsumerRecords<byte[], byte[]>[] data = new ConsumerRecords[numRecords];
+		for (int i = 0; i < numRecords; i++) {
+			data[i] = createTestRecords();
+		}
+
+		final Handover handover = new Handover();
+
+		ProducerThread producer = new ProducerThread(handover, data, maxProducerDelay);
+		ConsumerThread consumer = new ConsumerThread(handover, data, maxConsumerDelay);
+
+		consumer.start();
+		producer.start();
+
+		// sync first on the consumer, so it propagates assertion errors
+		consumer.sync();
+		producer.sync();
+	}
+
+	@SuppressWarnings("unchecked")
+	private static ConsumerRecords<byte[], byte[]> createTestRecords() {
+		return mock(ConsumerRecords.class);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static abstract class CheckedThread extends Thread {
+
+		private volatile Throwable error;
+
+		public abstract void go() throws Exception;
+
+		@Override
+		public void run() {
+			try {
+				go();
+			}
+			catch (Throwable t) {
+				error = t;
+			}
+		}
+
+		public void sync() throws Exception {
+			join();
+			if (error != null) {
+				ExceptionUtils.rethrowException(error, error.getMessage());
+			}
+		}
+
+		public void waitUntilThreadHoldsLock(long timeoutMillis) throws InterruptedException, TimeoutException {
+			final long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
+			
+			while (!isBlockedOrWaiting() && (System.nanoTime() < deadline)) {
+				Thread.sleep(1);
+			}
+
+			if (!isBlockedOrWaiting()) {
+				throw new TimeoutException();
+			}
+		}
+
+		private boolean isBlockedOrWaiting() {
+			State state = getState();
+			return state == State.BLOCKED || state == State.WAITING || state == State.TIMED_WAITING;
+		}
+	}
+
+	private static class ProducerThread extends CheckedThread {
+
+		private final Random rnd = new Random();
+		private final Handover handover;
+		private final ConsumerRecords<byte[], byte[]>[] data;
+		private final int maxDelay;
+
+		private ProducerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+			this.handover = handover;
+			this.data = data;
+			this.maxDelay = maxDelay;
+		}
+
+		@Override
+		public void go() throws Exception {
+			for (ConsumerRecords<byte[], byte[]> rec : data) {
+				handover.produce(rec);
+
+				if (maxDelay > 0) {
+					int delay = rnd.nextInt(maxDelay);
+					Thread.sleep(delay);
+				}
+			}
+		}
+	}
+
+	private static class ConsumerThread extends CheckedThread {
+
+		private final Random rnd = new Random();
+		private final Handover handover;
+		private final ConsumerRecords<byte[], byte[]>[] data;
+		private final int maxDelay;
+
+		private ConsumerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+			this.handover = handover;
+			this.data = data;
+			this.maxDelay = maxDelay;
+		}
+
+		@Override
+		public void go() throws Exception {
+			for (ConsumerRecords<byte[], byte[]> rec : data) {
+				ConsumerRecords<byte[], byte[]> next = handover.pollNext();
+
+				assertEquals(rec, next);
+
+				if (maxDelay > 0) {
+					int delay = rnd.nextInt(maxDelay);
+					Thread.sleep(delay);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 5c03b78..dccf698 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -122,6 +122,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 	 *
 	 */
 	private static boolean stopProducer = false;
+
 	public void runAutoOffsetResetTest() throws Exception {
 		final String topic = "auto-offset-reset-test";