You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:24 UTC

[34/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
new file mode 100644
index 0000000..c17aae6
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}.
+ * 
+ * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
+ * for example changing {@code assign(List)} to {@code assign(Collection)}.
+ * 
+ * Because of that, we need to two versions whose compiled code goes against different method signatures.
+ * Even though the source of subclasses may look identical, the byte code will be different, because they
+ * are compiled against different dependencies.
+ */
+public class KafkaConsumerCallBridge {
+
+	public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
+		consumer.assign(topicPartitions);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
new file mode 100644
index 0000000..9cfa840
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the brokers and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by the fetcher that will
+ * deserialize and emit the records.
+ * 
+ * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, and to even
+ * deadlock in certain situations.
+ * 
+ * <p>Implementation Note: This code is written to be reusable in later versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+	/** Logger for this consumer */
+	private final Logger log;
+
+	/** The handover of data and exceptions between the consumer thread and the task thread */
+	private final Handover handover;
+
+	/** The next offsets that the main thread should commit */
+	private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
+
+	/** The configuration for the Kafka consumer */
+	private final Properties kafkaProperties;
+
+	/** The partitions that this consumer reads from */ 
+	private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions;
+
+	/** We get this from the outside to publish metrics. **/
+	private final MetricGroup kafkaMetricGroup;
+
+	/** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken */
+	private final KafkaConsumerCallBridge consumerCallBridge;
+
+	/** The maximum number of milliseconds to wait for a fetch batch */
+	private final long pollTimeout;
+
+	/** Flag whether to add Kafka's metrics to the Flink metrics */
+	private final boolean useMetrics;
+
+	/** Reference to the Kafka consumer, once it is created */
+	private volatile KafkaConsumer<byte[], byte[]> consumer;
+
+	/** Flag to mark the main work loop as alive */
+	private volatile boolean running;
+
+	/** Flag tracking whether the latest commit request has completed */
+	private volatile boolean commitInProgress;
+
+
+	public KafkaConsumerThread(
+			Logger log,
+			Handover handover,
+			Properties kafkaProperties,
+			KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions,
+			MetricGroup kafkaMetricGroup,
+			KafkaConsumerCallBridge consumerCallBridge,
+			String threadName,
+			long pollTimeout,
+			boolean useMetrics) {
+
+		super(threadName);
+		setDaemon(true);
+
+		this.log = checkNotNull(log);
+		this.handover = checkNotNull(handover);
+		this.kafkaProperties = checkNotNull(kafkaProperties);
+		this.subscribedPartitions = checkNotNull(subscribedPartitions);
+		this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
+		this.consumerCallBridge = checkNotNull(consumerCallBridge);
+		this.pollTimeout = pollTimeout;
+		this.useMetrics = useMetrics;
+
+		this.nextOffsetsToCommit = new AtomicReference<>();
+		this.running = true;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void run() {
+		// early exit check
+		if (!running) {
+			return;
+		}
+
+		// this is the means to talk to FlinkKafkaConsumer's main thread
+		final Handover handover = this.handover;
+
+		// This method initializes the KafkaConsumer and guarantees it is torn down properly.
+		// This is important, because the consumer has multi-threading issues,
+		// including concurrent 'close()' calls.
+		final KafkaConsumer<byte[], byte[]> consumer;
+		try {
+			consumer = new KafkaConsumer<>(kafkaProperties);
+		}
+		catch (Throwable t) {
+			handover.reportError(t);
+			return;
+		}
+
+		// from here on, the consumer is guaranteed to be closed properly
+		try {
+			// The callback invoked by Kafka once an offset commit is complete
+			final OffsetCommitCallback offsetCommitCallback = new CommitCallback();
+
+			// tell the consumer which partitions to work with
+			consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions));
+
+			// register Kafka's very own metrics in Flink's metric reporters
+			if (useMetrics) {
+				// register Kafka metrics to Flink
+				Map<MetricName, ? extends Metric> metrics = consumer.metrics();
+				if (metrics == null) {
+					// MapR's Kafka implementation returns null here.
+					log.info("Consumer implementation does not support metrics");
+				} else {
+					// we have Kafka metrics, register them
+					for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
+						kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
+					}
+				}
+			}
+
+			// early exit check
+			if (!running) {
+				return;
+			}
+
+			// seek the consumer to the initial offsets
+			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
+				if (partition.isOffsetDefined()) {
+					log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; " +
+							"seeking the consumer to position {}",
+							partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
+
+					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+				}
+				else {
+					// for partitions that do not have offsets restored from a checkpoint/savepoint,
+					// we need to define our internal offset state for them using the initial offsets retrieved from Kafka
+					// by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
+
+					long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
+
+					log.info("Partition {} has no initial offset; the consumer has position {}, " +
+							"so the initial offset will be set to {}",
+							partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
+
+					// the fetched offset represents the next record to process, so we need to subtract it by 1
+					partition.setOffset(fetchedOffset - 1);
+				}
+			}
+
+			// from now on, external operations may call the consumer
+			this.consumer = consumer;
+
+			// the latest bulk of records. may carry across the loop if the thread is woken up
+			// from blocking on the handover
+			ConsumerRecords<byte[], byte[]> records = null;
+
+			// main fetch loop
+			while (running) {
+
+				// check if there is something to commit
+				if (!commitInProgress) {
+					// get and reset the work-to-be committed, so we don't repeatedly commit the same
+					final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
+
+					if (toCommit != null) {
+						log.debug("Sending async offset commit request to Kafka broker");
+
+						// also record that a commit is already in progress
+						// the order here matters! first set the flag, then send the commit command.
+						commitInProgress = true;
+						consumer.commitAsync(toCommit, offsetCommitCallback);
+					}
+				}
+
+				// get the next batch of records, unless we did not manage to hand the old batch over
+				if (records == null) {
+					try {
+						records = consumer.poll(pollTimeout);
+					}
+					catch (WakeupException we) {
+						continue;
+					}
+				}
+
+				try {
+					handover.produce(records);
+					records = null;
+				}
+				catch (Handover.WakeupException e) {
+					// fall through the loop
+				}
+			}
+			// end main fetch loop
+		}
+		catch (Throwable t) {
+			// let the main thread know and exit
+			// it may be that this exception comes because the main thread closed the handover, in
+			// which case the below reporting is irrelevant, but does not hurt either
+			handover.reportError(t);
+		}
+		finally {
+			// make sure the handover is closed if it is not already closed or has an error
+			handover.close();
+
+			// make sure the KafkaConsumer is closed
+			try {
+				consumer.close();
+			}
+			catch (Throwable t) {
+				log.warn("Error while closing Kafka consumer", t);
+			}
+		}
+	}
+
+	/**
+	 * Shuts this thread down, waking up the thread gracefully if blocked (without Thread.interrupt() calls).
+	 */
+	public void shutdown() {
+		running = false;
+
+		// We cannot call close() on the KafkaConsumer, because it will actually throw
+		// an exception if a concurrent call is in progress
+
+		// this wakes up the consumer if it is blocked handing over records
+		handover.wakeupProducer();
+
+		// this wakes up the consumer if it is blocked in a kafka poll 
+		if (consumer != null) {
+			consumer.wakeup();
+		}
+	}
+
+	/**
+	 * Tells this thread to commit a set of offsets. This method does not block, the committing
+	 * operation will happen asynchronously.
+	 * 
+	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
+	 * the frequency with which this method is called, then some commits may be skipped due to being
+	 * superseded  by newer ones.
+	 * 
+	 * @param offsetsToCommit The offsets to commit
+	 */
+	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
+		// record the work to be committed by the main consumer thread and make sure the consumer notices that
+		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
+			log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
+					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
+					"This does not compromise Flink's checkpoint integrity.");
+		}
+
+		// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
+		handover.wakeupProducer();
+		if (consumer != null) {
+			consumer.wakeup();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
+		ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
+		for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
+			result.add(p.getKafkaPartitionHandle());
+		}
+		return result;
+	}
+
+	// ------------------------------------------------------------------------
+
+	private class CommitCallback implements OffsetCommitCallback {
+
+		@Override
+		public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
+			commitInProgress = false;
+
+			if (ex != null) {
+				log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
new file mode 100644
index 0000000..7a82365
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Handover;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka09Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Kafka09FetcherTest {
+
+	@Test
+	public void testCommitDoesNotBlock() throws Exception {
+
+		// test data
+		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+		testCommitData.put(testPartition, 11L);
+
+		// to synchronize when the consumer is in its blocking method
+		final OneShotLatch sync = new OneShotLatch();
+
+		// ----- the mock consumer with blocking poll calls ----
+		final MultiShotLatch blockerLatch = new MultiShotLatch();
+		
+		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+			
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+				sync.trigger();
+				blockerLatch.await();
+				return ConsumerRecords.empty();
+			}
+		});
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				blockerLatch.trigger();
+				return null;
+			}
+		}).when(mockConsumer).wakeup();
+
+		// make sure the fetcher creates the mock consumer
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- create the test fetcher -----
+
+		@SuppressWarnings("unchecked")
+		SourceContext<String> sourceContext = mock(SourceContext.class);
+		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+				sourceContext,
+				topics,
+				null, /* periodic watermark extractor */
+				null, /* punctuated watermark extractor */
+				new TestProcessingTimeService(),
+				10, /* watermark interval */
+				this.getClass().getClassLoader(),
+				true, /* checkpointing */
+				"task_name",
+				new UnregisteredMetricsGroup(),
+				schema,
+				new Properties(),
+				0L,
+				false);
+
+		// ----- run the fetcher -----
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
+
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
+
+		// wait until the fetcher has reached the method of interest
+		sync.await();
+
+		// ----- trigger the offset commit -----
+		
+		final AtomicReference<Throwable> commitError = new AtomicReference<>();
+		final Thread committer = new Thread("committer runner") {
+			@Override
+			public void run() {
+				try {
+					fetcher.commitInternalOffsetsToKafka(testCommitData);
+				} catch (Throwable t) {
+					commitError.set(t);
+				}
+			}
+		};
+		committer.start();
+
+		// ----- ensure that the committer finishes in time  -----
+		committer.join(30000);
+		assertFalse("The committer did not finish in time", committer.isAlive());
+
+		// ----- test done, wait till the fetcher is done for a clean shutdown -----
+		fetcher.cancel();
+		fetcherRunner.join();
+
+		// check that there were no errors in the fetcher
+		final Throwable fetcherError = error.get();
+		if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
+			throw new Exception("Exception in the fetcher", fetcherError);
+		}
+		final Throwable committerError = commitError.get();
+		if (committerError != null) {
+			throw new Exception("Exception in the committer", committerError);
+		}
+	}
+
+	@Test
+	public void ensureOffsetsGetCommitted() throws Exception {
+		
+		// test data
+		final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
+		final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
+		
+		final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
+		testCommitData1.put(testPartition1, 11L);
+		testCommitData1.put(testPartition2, 18L);
+
+		final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
+		testCommitData2.put(testPartition1, 19L);
+		testCommitData2.put(testPartition2, 28L);
+
+		final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
+
+
+		// ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
+
+		final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+				blockerLatch.await();
+				return ConsumerRecords.empty();
+			}
+		});
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				blockerLatch.trigger();
+				return null;
+			}
+		}).when(mockConsumer).wakeup();
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				@SuppressWarnings("unchecked")
+				Map<TopicPartition, OffsetAndMetadata> offsets = 
+						(Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
+
+				OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
+
+				commitStore.add(offsets);
+				callback.onComplete(offsets, null);
+
+				return null; 
+			}
+		}).when(mockConsumer).commitAsync(
+				Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+		// make sure the fetcher creates the mock consumer
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- create the test fetcher -----
+
+		@SuppressWarnings("unchecked")
+		SourceContext<String> sourceContext = mock(SourceContext.class);
+		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+				sourceContext,
+				topics,
+				null, /* periodic watermark extractor */
+				null, /* punctuated watermark extractor */
+				new TestProcessingTimeService(),
+				10, /* watermark interval */
+				this.getClass().getClassLoader(),
+				true, /* checkpointing */
+				"task_name",
+				new UnregisteredMetricsGroup(),
+				schema,
+				new Properties(),
+				0L,
+				false);
+
+
+		// ----- run the fetcher -----
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
+
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
+
+		// ----- trigger the first offset commit -----
+
+		fetcher.commitInternalOffsetsToKafka(testCommitData1);
+		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
+
+		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
+			TopicPartition partition = entry.getKey();
+			if (partition.topic().equals("test")) {
+				assertEquals(42, partition.partition());
+				assertEquals(12L, entry.getValue().offset());
+			}
+			else if (partition.topic().equals("another")) {
+				assertEquals(99, partition.partition());
+				assertEquals(17L, entry.getValue().offset());
+			}
+		}
+
+		// ----- trigger the second offset commit -----
+
+		fetcher.commitInternalOffsetsToKafka(testCommitData2);
+		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
+
+		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
+			TopicPartition partition = entry.getKey();
+			if (partition.topic().equals("test")) {
+				assertEquals(42, partition.partition());
+				assertEquals(20L, entry.getValue().offset());
+			}
+			else if (partition.topic().equals("another")) {
+				assertEquals(99, partition.partition());
+				assertEquals(27L, entry.getValue().offset());
+			}
+		}
+		
+		// ----- test done, wait till the fetcher is done for a clean shutdown -----
+		fetcher.cancel();
+		fetcherRunner.join();
+
+		// check that there were no errors in the fetcher
+		final Throwable caughtError = error.get();
+		if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
+			throw new Exception("Exception in the fetcher", caughtError);
+		}
+	}
+
+	@Test
+	public void testCancellationWhenEmitBlocks() throws Exception {
+
+		// ----- some test data -----
+
+		final String topic = "test-topic";
+		final int partition = 3;
+		final byte[] payload = new byte[] {1, 2, 3, 4};
+
+		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+
+		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
+		data.put(new TopicPartition(topic, partition), records);
+
+		final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
+
+		// ----- the test consumer -----
+
+		final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+				return consumerRecords;
+			}
+		});
+
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- build a fetcher -----
+
+		BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
+		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+				sourceContext,
+				topics,
+				null, /* periodic watermark extractor */
+				null, /* punctuated watermark extractor */
+				new TestProcessingTimeService(),
+				10, /* watermark interval */
+				this.getClass().getClassLoader(),
+				true, /* checkpointing */
+				"task_name",
+				new UnregisteredMetricsGroup(),
+				schema,
+				new Properties(),
+				0L,
+				false);
+
+
+		// ----- run the fetcher -----
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
+
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
+
+		// wait until the thread started to emit records to the source context
+		sourceContext.waitTillHasBlocker();
+
+		// now we try to cancel the fetcher, including the interruption usually done on the task thread
+		// once it has finished, there must be no more thread blocked on the source context
+		fetcher.cancel();
+		fetcherRunner.interrupt();
+		fetcherRunner.join();
+
+		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
+	}
+
+	// ------------------------------------------------------------------------
+	//  test utilities
+	// ------------------------------------------------------------------------
+
+	private static final class BlockingSourceContext<T> implements SourceContext<T> {
+
+		private final ReentrantLock lock = new ReentrantLock();
+		private final OneShotLatch inBlocking = new OneShotLatch();
+
+		@Override
+		public void collect(T element) {
+			block();
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			block();
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			block();
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return new Object();
+		}
+
+		@Override
+		public void close() {}
+
+		public void waitTillHasBlocker() throws InterruptedException {
+			inBlocking.await();
+		}
+
+		public boolean isStillBlocking() {
+			return lock.isLocked();
+		}
+
+		@SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
+		private void block() {
+			lock.lock();
+			try {
+				inBlocking.trigger();
+
+				// put this thread to sleep indefinitely
+				final Object o = new Object();
+				while (true) {
+					synchronized (o) {
+						o.wait();
+					}
+				}
+			}
+			catch (InterruptedException e) {
+				// exit cleanly, simply reset the interruption flag
+				Thread.currentThread().interrupt();
+			}
+			finally {
+				lock.unlock();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
new file mode 100644
index 0000000..d18e2a9
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Test;
+
+public class Kafka09ITCase extends KafkaConsumerTestBase {
+
+	// ------------------------------------------------------------------------
+	//  Suite of Tests
+	// ------------------------------------------------------------------------
+
+	@Test(timeout = 60000)
+	public void testFailOnNoBroker() throws Exception {
+		runFailOnNoBrokerTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testConcurrentProducerConsumerTopology() throws Exception {
+		runSimpleConcurrentProducerConsumerTopology();
+	}
+
+
+	@Test(timeout = 60000)
+	public void testKeyValueSupport() throws Exception {
+		runKeyValueTest();
+	}
+
+	// --- canceling / failures ---
+
+	@Test(timeout = 60000)
+	public void testCancelingEmptyTopic() throws Exception {
+		runCancelingOnEmptyInputTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testCancelingFullTopic() throws Exception {
+		runCancelingOnFullInputTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testFailOnDeploy() throws Exception {
+		runFailOnDeployTest();
+	}
+
+
+	// --- source to partition mappings and exactly once ---
+
+	@Test(timeout = 60000)
+	public void testOneToOneSources() throws Exception {
+		runOneToOneExactlyOnceTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testOneSourceMultiplePartitions() throws Exception {
+		runOneSourceMultiplePartitionsExactlyOnceTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMultipleSourcesOnePartition() throws Exception {
+		runMultipleSourcesOnePartitionExactlyOnceTest();
+	}
+
+	// --- broker failure ---
+
+	@Test(timeout = 60000)
+	public void testBrokerFailure() throws Exception {
+		runBrokerFailureTest();
+	}
+
+	// --- special executions ---
+
+	@Test(timeout = 60000)
+	public void testBigRecordJob() throws Exception {
+		runBigRecordTestTopology();
+	}
+
+	@Test(timeout = 60000)
+	public void testMultipleTopics() throws Exception {
+		runProduceConsumeMultipleTopics();
+	}
+
+	@Test(timeout = 60000)
+	public void testAllDeletes() throws Exception {
+		runAllDeletesTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testEndOfStream() throws Exception {
+		runEndOfStreamTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMetrics() throws Throwable {
+		runMetricsTest();
+	}
+
+	// --- offset committing ---
+
+	@Test(timeout = 60000)
+	public void testCommitOffsetsToKafka() throws Exception {
+		runCommitOffsetsToKafka();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromKafkaCommitOffsets() throws Exception {
+		runStartFromKafkaCommitOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+		runAutoOffsetRetrievalAndCommitToKafka();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
new file mode 100644
index 0000000..45f70ac
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+	@Override
+	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+			final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+		return new Kafka09JsonTableSink(topic, properties, partitioner) {
+			@Override
+			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+					SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
+				return kafkaProducer;
+			}
+		};
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected SerializationSchema<Row> getSerializationSchema() {
+		return new JsonRowSerializationSchema(FIELD_NAMES);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
new file mode 100644
index 0000000..4a75f50
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+	@Override
+	protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
+		return new Kafka09JsonTableSource(topic, properties, fieldNames, typeInfo);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+		return (Class) JsonRowDeserializationSchema.class;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer09.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
new file mode 100644
index 0000000..ae4f5b2
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class Kafka09ProducerITCase extends KafkaProducerTestBase {
+
+	@Test
+	public void testCustomPartitioning() {
+		runCustomPartitioningTest();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
new file mode 100644
index 0000000..e748537
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/*
+ * Kafka Secure Connection (kerberos) IT test case
+ */
+public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
+
+	@BeforeClass
+	public static void prepare() throws IOException, ClassNotFoundException {
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Starting Kafka09SecuredRunITCase ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		SecureTestEnvironment.prepare(tempFolder);
+		SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
+
+		startClusters(true);
+	}
+
+	@AfterClass
+	public static void shutDownServices() {
+		shutdownClusters();
+		SecureTestEnvironment.cleanup();
+	}
+
+
+	//timeout interval is large since in Travis, ZK connection timeout occurs frequently
+	//The timeout for the test case is 2 times timeout of ZK connection
+	@Test(timeout = 600000)
+	public void testMultipleTopics() throws Exception {
+		runProduceConsumeMultipleTopics();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..18b2aec
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducerBase.class)
+public class KafkaProducerTest extends TestLogger {
+	
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testPropagateExceptions() {
+		try {
+			// mock kafka producer
+			KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
+			
+			// partition setup
+			when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+				// returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour
+				Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
+
+			// failure when trying to send an element
+			when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
+				.thenAnswer(new Answer<Future<RecordMetadata>>() {
+					@Override
+					public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
+						Callback callback = (Callback) invocation.getArguments()[1];
+						callback.onCompletion(null, new Exception("Test error"));
+						return null;
+					}
+				});
+			
+			// make sure the FlinkKafkaProducer instantiates our mock producer
+			whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+			
+			// (1) producer that propagates errors
+
+			FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>(
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+
+			OneInputStreamOperatorTestHarness<String, Object> testHarness =
+					new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
+
+			testHarness.open();
+
+			try {
+				testHarness.processElement(new StreamRecord<>("value"));
+				testHarness.processElement(new StreamRecord<>("value"));
+				fail("This should fail with an exception");
+			}
+			catch (Exception e) {
+				assertNotNull(e.getCause());
+				assertNotNull(e.getCause().getMessage());
+				assertTrue(e.getCause().getMessage().contains("Test error"));
+			}
+
+			// (2) producer that only logs errors
+
+			FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>(
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+			producerLogging.setLogFailuresOnly(true);
+
+			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
+
+			testHarness.open();
+
+			testHarness.processElement(new StreamRecord<>("value"));
+			testHarness.processElement(new StreamRecord<>("value"));
+
+			testHarness.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 0000000..1802e0c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.common.KafkaException;
+import kafka.api.PartitionMetadata;
+import kafka.network.SocketServer;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.SystemTime$;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 0.9
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+	private File tmpZkDir;
+	private File tmpKafkaParent;
+	private List<File> tmpKafkaDirs;
+	private List<KafkaServer> brokers;
+	private TestingServer zookeeper;
+	private String zookeeperConnectionString;
+	private String brokerConnectionString = "";
+	private Properties standardProps;
+	private Properties additionalServerProperties;
+	private boolean secureMode = false;
+	// 6 seconds is default. Seems to be too small for travis. 30 seconds
+	private String zkTimeout = "30000";
+
+	public String getBrokerConnectionString() {
+		return brokerConnectionString;
+	}
+
+	@Override
+	public Properties getStandardProperties() {
+		return standardProps;
+	}
+
+	@Override
+	public String getVersion() {
+		return "0.9";
+	}
+
+	@Override
+	public List<KafkaServer> getBrokers() {
+		return brokers;
+	}
+
+	@Override
+	public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
+		return new FlinkKafkaConsumer09<>(topics, readSchema, props);
+	}
+
+	@Override
+	public <T> StreamSink<T> getProducerSink(
+			String topic,
+			KeyedSerializationSchema<T> serSchema,
+			Properties props,
+			KafkaPartitioner<T> partitioner) {
+		FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
+		prod.setFlushOnCheckpoint(true);
+		return new StreamSink<>(prod);
+	}
+
+	@Override
+	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+		FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
+		prod.setFlushOnCheckpoint(true);
+		return stream.addSink(prod);
+	}
+
+	@Override
+	public KafkaOffsetHandler createOffsetHandler(Properties props) {
+		return new KafkaOffsetHandlerImpl(props);
+	}
+
+	@Override
+	public void restartBroker(int leaderId) throws Exception {
+		brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
+	}
+
+	@Override
+	public int getLeaderToShutDown(String topic) throws Exception {
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			PartitionMetadata firstPart = null;
+			do {
+				if (firstPart != null) {
+					LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+					// not the first try. Sleep a bit
+					Thread.sleep(150);
+				}
+
+				Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata();
+				firstPart = partitionMetadata.head();
+			}
+			while (firstPart.errorCode() != 0);
+
+			return firstPart.leader().get().id();
+		} finally {
+			zkUtils.close();
+		}
+	}
+
+	@Override
+	public int getBrokerId(KafkaServer server) {
+		return server.config().brokerId();
+	}
+
+	@Override
+	public boolean isSecureRunSupported() {
+		return true;
+	}
+
+	@Override
+	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
+
+		//increase the timeout since in Travis ZK connection takes long time for secure connection.
+		if(secureMode) {
+			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
+			numKafkaServers = 1;
+			zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
+		}
+
+		this.additionalServerProperties = additionalServerProperties;
+		this.secureMode = secureMode;
+		File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
+
+		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
+
+		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
+		for (int i = 0; i < numKafkaServers; i++) {
+			File tmpDir = new File(tmpKafkaParent, "server-" + i);
+			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
+			tmpKafkaDirs.add(tmpDir);
+		}
+
+		zookeeper = null;
+		brokers = null;
+
+		try {
+			LOG.info("Starting Zookeeper");
+			zookeeper = new TestingServer(-1, tmpZkDir);
+			zookeeperConnectionString = zookeeper.getConnectString();
+			LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
+
+			LOG.info("Starting KafkaServer");
+			brokers = new ArrayList<>(numKafkaServers);
+
+			for (int i = 0; i < numKafkaServers; i++) {
+				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+
+				SocketServer socketServer = brokers.get(i).socketServer();
+				if(secureMode) {
+					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+				} else {
+					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+				}
+			}
+
+			LOG.info("ZK and KafkaServer started.");
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			fail("Test setup failed: " + t.getMessage());
+		}
+
+		LOG.info("brokerConnectionString --> {}", brokerConnectionString);
+
+		standardProps = new Properties();
+		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
+		standardProps.setProperty("group.id", "flink-tests");
+		standardProps.setProperty("enable.auto.commit", "false");
+		standardProps.setProperty("zookeeper.session.timeout.ms", zkTimeout);
+		standardProps.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
+		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value)
+		standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+
+	}
+
+	@Override
+	public void shutdown() {
+		for (KafkaServer broker : brokers) {
+			if (broker != null) {
+				broker.shutdown();
+			}
+		}
+		brokers.clear();
+
+		if (zookeeper != null) {
+			try {
+				zookeeper.stop();
+				zookeeper.close();
+			}
+			catch (Exception e) {
+				LOG.warn("ZK.stop() failed", e);
+			}
+			zookeeper = null;
+		}
+
+		// clean up the temp spaces
+
+		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+			try {
+				FileUtils.deleteDirectory(tmpKafkaParent);
+			}
+			catch (Exception e) {
+				// ignore
+			}
+		}
+		if (tmpZkDir != null && tmpZkDir.exists()) {
+			try {
+				FileUtils.deleteDirectory(tmpZkDir);
+			}
+			catch (Exception e) {
+				// ignore
+			}
+		}
+	}
+
+	public ZkUtils getZkUtils() {
+		LOG.info("In getZKUtils:: zookeeperConnectionString = {}", zookeeperConnectionString);
+		ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
+		return ZkUtils.apply(creator, false);
+	}
+
+	@Override
+	public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
+		// create topic with one client
+		LOG.info("Creating topic {}", topic);
+
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig);
+		} finally {
+			zkUtils.close();
+		}
+
+		LOG.info("Topic {} create request is successfully posted", topic);
+
+		// validate that the topic has been created
+		final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout);
+		do {
+			try {
+				if(secureMode) {
+					//increase wait time since in Travis ZK timeout occurs frequently
+					int wait = Integer.parseInt(zkTimeout) / 100;
+					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
+					Thread.sleep(wait);
+				} else {
+					Thread.sleep(100);
+				}
+
+			} catch (InterruptedException e) {
+				// restore interrupted state
+			}
+			// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
+			// not always correct.
+
+			LOG.info("Validating if the topic {} has been created or not", topic);
+
+			// create a new ZK utils connection
+			ZkUtils checkZKConn = getZkUtils();
+			if(AdminUtils.topicExists(checkZKConn, topic)) {
+				LOG.info("topic {} has been created successfully", topic);
+				checkZKConn.close();
+				return;
+			}
+			LOG.info("topic {} has not been created yet. Will check again...", topic);
+			checkZKConn.close();
+		}
+		while (System.currentTimeMillis() < deadline);
+		fail("Test topic could not be created");
+	}
+
+	@Override
+	public void deleteTestTopic(String topic) {
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			LOG.info("Deleting topic {}", topic);
+
+			ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
+
+			AdminUtils.deleteTopic(zkUtils, topic);
+
+			zk.close();
+		} finally {
+			zkUtils.close();
+		}
+	}
+
+	/**
+	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+	 */
+	protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
+		Properties kafkaProperties = new Properties();
+
+		// properties have to be Strings
+		kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+		kafkaProperties.put("broker.id", Integer.toString(brokerId));
+		kafkaProperties.put("log.dir", tmpFolder.toString());
+		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+		kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
+		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
+
+		// for CI stability, increase zookeeper session timeout
+		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
+		if(additionalServerProperties != null) {
+			kafkaProperties.putAll(additionalServerProperties);
+		}
+
+		final int numTries = 5;
+
+		for (int i = 1; i <= numTries; i++) {
+			int kafkaPort = NetUtils.getAvailablePort();
+			kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+			//to support secure kafka cluster
+			if(secureMode) {
+				LOG.info("Adding Kafka secure configurations");
+				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.putAll(getSecureProperties());
+			}
+
+			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+			try {
+				scala.Option<String> stringNone = scala.Option.apply(null);
+				KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+				server.startup();
+				return server;
+			}
+			catch (KafkaException e) {
+				if (e.getCause() instanceof BindException) {
+					// port conflict, retry...
+					LOG.info("Port conflict when starting Kafka Broker. Retrying...");
+				}
+				else {
+					throw e;
+				}
+			}
+		}
+
+		throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
+	}
+
+	public Properties getSecureProperties() {
+		Properties prop = new Properties();
+		if(secureMode) {
+			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+			prop.put("security.protocol", "SASL_PLAINTEXT");
+			prop.put("sasl.kerberos.service.name", "kafka");
+
+			//add special timeout for Travis
+			prop.setProperty("zookeeper.session.timeout.ms", zkTimeout);
+			prop.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
+			prop.setProperty("metadata.fetch.timeout.ms","120000");
+		}
+		return prop;
+	}
+
+	private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+		private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+		public KafkaOffsetHandlerImpl(Properties props) {
+			offsetClient = new KafkaConsumer<>(props);
+		}
+
+		@Override
+		public Long getCommittedOffset(String topicName, int partition) {
+			OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition));
+			return (committed != null) ? committed.offset() : null;
+		}
+
+		@Override
+		public void close() {
+			offsetClient.close();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
new file mode 100644
index 0000000..25040eb
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link Handover} between Kafka Consumer Thread and the fetcher's main thread. 
+ */
+public class HandoverTest {
+
+	// ------------------------------------------------------------------------
+	//  test produce / consumer
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testWithVariableProducer() throws Exception {
+		runProducerConsumerTest(500, 2, 0);
+	}
+
+	@Test
+	public void testWithVariableConsumer() throws Exception {
+		runProducerConsumerTest(500, 0, 2);
+	}
+
+	@Test
+	public void testWithVariableBoth() throws Exception {
+		runProducerConsumerTest(500, 2, 2);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test error propagation
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testPublishErrorOnEmptyHandover() throws Exception {
+		final Handover handover = new Handover();
+
+		Exception error = new Exception();
+		handover.reportError(error);
+
+		try {
+			handover.pollNext();
+			fail("should throw an exception");
+		}
+		catch (Exception e) {
+			assertEquals(error, e);
+		}
+	}
+
+	@Test
+	public void testPublishErrorOnFullHandover() throws Exception {
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+
+		IOException error = new IOException();
+		handover.reportError(error);
+
+		try {
+			handover.pollNext();
+			fail("should throw an exception");
+		}
+		catch (Exception e) {
+			assertEquals(error, e);
+		}
+	}
+
+	@Test
+	public void testExceptionMarksClosedOnEmpty() throws Exception {
+		final Handover handover = new Handover();
+
+		IllegalStateException error = new IllegalStateException();
+		handover.reportError(error);
+
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testExceptionMarksClosedOnFull() throws Exception {
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+
+		LinkageError error = new LinkageError();
+		handover.reportError(error);
+
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test closing behavior
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testCloseEmptyForConsumer() throws Exception {
+		final Handover handover = new Handover();
+		handover.close();
+
+		try {
+			handover.pollNext();
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCloseFullForConsumer() throws Exception {
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+		handover.close();
+
+		try {
+			handover.pollNext();
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCloseEmptyForProducer() throws Exception {
+		final Handover handover = new Handover();
+		handover.close();
+
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCloseFullForProducer() throws Exception {
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+		handover.close();
+
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test wake up behavior
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testWakeupDoesNotWakeWhenEmpty() throws Exception {
+		Handover handover = new Handover();
+		handover.wakeupProducer();
+
+		// produce into a woken but empty handover
+		try {
+			handover.produce(createTestRecords());
+		}
+		catch (Handover.WakeupException e) {
+			fail();
+		}
+
+		// handover now has records, next time we wakeup and produce it needs
+		// to throw an exception
+		handover.wakeupProducer();
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.WakeupException e) {
+			// expected
+		}
+
+		// empty the handover
+		assertNotNull(handover.pollNext());
+		
+		// producing into an empty handover should work
+		try {
+			handover.produce(createTestRecords());
+		}
+		catch (Handover.WakeupException e) {
+			fail();
+		}
+	}
+
+	@Test
+	public void testWakeupWakesOnlyOnce() throws Exception {
+		// create a full handover
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+
+		handover.wakeupProducer();
+
+		try {
+			handover.produce(createTestRecords());
+			fail();
+		} catch (WakeupException e) {
+			// expected
+		}
+
+		CheckedThread producer = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				handover.produce(createTestRecords());
+			}
+		};
+		producer.start();
+
+		// the producer must go blocking
+		producer.waitUntilThreadHoldsLock(10000);
+
+		// release the thread by consuming something
+		assertNotNull(handover.pollNext());
+		producer.sync();
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private void runProducerConsumerTest(int numRecords, int maxProducerDelay, int maxConsumerDelay) throws Exception {
+		// generate test data
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		final ConsumerRecords<byte[], byte[]>[] data = new ConsumerRecords[numRecords];
+		for (int i = 0; i < numRecords; i++) {
+			data[i] = createTestRecords();
+		}
+
+		final Handover handover = new Handover();
+
+		ProducerThread producer = new ProducerThread(handover, data, maxProducerDelay);
+		ConsumerThread consumer = new ConsumerThread(handover, data, maxConsumerDelay);
+
+		consumer.start();
+		producer.start();
+
+		// sync first on the consumer, so it propagates assertion errors
+		consumer.sync();
+		producer.sync();
+	}
+
+	@SuppressWarnings("unchecked")
+	private static ConsumerRecords<byte[], byte[]> createTestRecords() {
+		return mock(ConsumerRecords.class);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static abstract class CheckedThread extends Thread {
+
+		private volatile Throwable error;
+
+		public abstract void go() throws Exception;
+
+		@Override
+		public void run() {
+			try {
+				go();
+			}
+			catch (Throwable t) {
+				error = t;
+			}
+		}
+
+		public void sync() throws Exception {
+			join();
+			if (error != null) {
+				ExceptionUtils.rethrowException(error, error.getMessage());
+			}
+		}
+
+		public void waitUntilThreadHoldsLock(long timeoutMillis) throws InterruptedException, TimeoutException {
+			final long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
+			
+			while (!isBlockedOrWaiting() && (System.nanoTime() < deadline)) {
+				Thread.sleep(1);
+			}
+
+			if (!isBlockedOrWaiting()) {
+				throw new TimeoutException();
+			}
+		}
+
+		private boolean isBlockedOrWaiting() {
+			State state = getState();
+			return state == State.BLOCKED || state == State.WAITING || state == State.TIMED_WAITING;
+		}
+	}
+
+	private static class ProducerThread extends CheckedThread {
+
+		private final Random rnd = new Random();
+		private final Handover handover;
+		private final ConsumerRecords<byte[], byte[]>[] data;
+		private final int maxDelay;
+
+		private ProducerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+			this.handover = handover;
+			this.data = data;
+			this.maxDelay = maxDelay;
+		}
+
+		@Override
+		public void go() throws Exception {
+			for (ConsumerRecords<byte[], byte[]> rec : data) {
+				handover.produce(rec);
+
+				if (maxDelay > 0) {
+					int delay = rnd.nextInt(maxDelay);
+					Thread.sleep(delay);
+				}
+			}
+		}
+	}
+
+	private static class ConsumerThread extends CheckedThread {
+
+		private final Random rnd = new Random();
+		private final Handover handover;
+		private final ConsumerRecords<byte[], byte[]>[] data;
+		private final int maxDelay;
+
+		private ConsumerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+			this.handover = handover;
+			this.data = data;
+			this.maxDelay = maxDelay;
+		}
+
+		@Override
+		public void go() throws Exception {
+			for (ConsumerRecords<byte[], byte[]> rec : data) {
+				ConsumerRecords<byte[], byte[]> next = handover.pollNext();
+
+				assertEquals(rec, next);
+
+				if (maxDelay > 0) {
+					int delay = rnd.nextInt(maxDelay);
+					Thread.sleep(delay);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..4ac1773
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -0,0 +1,32 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger
+
+log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file