You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:59 UTC

[09/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
deleted file mode 100644
index d495327..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.SerializedValue;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
- * 
- * @param <T> The type of elements produced by the fetcher.
- */
-public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(Kafka09Fetcher.class);
-
-	// ------------------------------------------------------------------------
-
-	/** The schema to convert between Kafka's byte messages, and Flink's objects */
-	private final KeyedDeserializationSchema<T> deserializer;
-
-	/** The handover of data and exceptions between the consumer thread and the task thread */
-	private final Handover handover;
-
-	/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher */
-	private final KafkaConsumerThread consumerThread;
-
-	/** Flag to mark the main work loop as alive */
-	private volatile boolean running = true;
-
-	// ------------------------------------------------------------------------
-
-	public Kafka09Fetcher(
-			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> assignedPartitions,
-			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
-			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			ProcessingTimeService processingTimeProvider,
-			long autoWatermarkInterval,
-			ClassLoader userCodeClassLoader,
-			boolean enableCheckpointing,
-			String taskNameWithSubtasks,
-			MetricGroup metricGroup,
-			KeyedDeserializationSchema<T> deserializer,
-			Properties kafkaProperties,
-			long pollTimeout,
-			boolean useMetrics) throws Exception
-	{
-		super(
-				sourceContext,
-				assignedPartitions,
-				watermarksPeriodic,
-				watermarksPunctuated,
-				processingTimeProvider,
-				autoWatermarkInterval,
-				userCodeClassLoader,
-				useMetrics);
-
-		this.deserializer = deserializer;
-		this.handover = new Handover();
-
-		final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
-		addOffsetStateGauge(kafkaMetricGroup);
-
-		// if checkpointing is enabled, we are not automatically committing to Kafka.
-		kafkaProperties.setProperty(
-				ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-				Boolean.toString(!enableCheckpointing));
-		
-		this.consumerThread = new KafkaConsumerThread(
-				LOG,
-				handover,
-				kafkaProperties,
-				subscribedPartitions(),
-				kafkaMetricGroup,
-				createCallBridge(),
-				getFetcherName() + " for " + taskNameWithSubtasks,
-				pollTimeout,
-				useMetrics);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Fetcher work methods
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void runFetchLoop() throws Exception {
-		try {
-			final Handover handover = this.handover;
-
-			// kick off the actual Kafka consumer
-			consumerThread.start();
-
-			while (running) {
-				// this blocks until we get the next records
-				// it automatically re-throws exceptions encountered in the fetcher thread
-				final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
-
-				// get the records for each topic partition
-				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
-
-					List<ConsumerRecord<byte[], byte[]>> partitionRecords =
-							records.records(partition.getKafkaPartitionHandle());
-
-					for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
-
-						final T value = deserializer.deserialize(
-								record.key(), record.value(),
-								record.topic(), record.partition(), record.offset());
-
-						if (deserializer.isEndOfStream(value)) {
-							// end of stream signaled
-							running = false;
-							break;
-						}
-
-						// emit the actual record. this also updates offset state atomically
-						// and deals with timestamps and watermark generation
-						emitRecord(value, partition, record.offset(), record);
-					}
-				}
-			}
-		}
-		finally {
-			// this signals the consumer thread that no more work is to be done
-			consumerThread.shutdown();
-		}
-
-		// on a clean exit, wait for the runner thread
-		try {
-			consumerThread.join();
-		}
-		catch (InterruptedException e) {
-			// may be the result of a wake-up interruption after an exception.
-			// we ignore this here and only restore the interruption state
-			Thread.currentThread().interrupt();
-		}
-	}
-
-	@Override
-	public void cancel() {
-		// flag the main thread to exit. A thread interrupt will come anyways.
-		running = false;
-		handover.close();
-		consumerThread.shutdown();
-	}
-
-	// ------------------------------------------------------------------------
-	//  The below methods are overridden in the 0.10 fetcher, which otherwise
-	//   reuses most of the 0.9 fetcher behavior
-	// ------------------------------------------------------------------------
-
-	protected void emitRecord(
-			T record,
-			KafkaTopicPartitionState<TopicPartition> partition,
-			long offset,
-			@SuppressWarnings("UnusedParameters") ConsumerRecord<?, ?> consumerRecord) throws Exception {
-
-		// the 0.9 Fetcher does not try to extract a timestamp
-		emitRecord(record, partition, offset);
-	}
-
-	/**
-	 * Gets the name of this fetcher, for thread naming and logging purposes.
-	 */
-	protected String getFetcherName() {
-		return "Kafka 0.9 Fetcher";
-	}
-
-	protected KafkaConsumerCallBridge createCallBridge() {
-		return new KafkaConsumerCallBridge();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Implement Methods of the AbstractFetcher
-	// ------------------------------------------------------------------------
-
-	@Override
-	public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
-		return new TopicPartition(partition.getTopic(), partition.getPartition());
-	}
-
-	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
-		KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions();
-		Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
-
-		for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
-			Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
-			if (lastProcessedOffset != null) {
-				// committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
-				// This does not affect Flink's checkpoints/saved state.
-				long offsetToCommit = lastProcessedOffset + 1;
-
-				offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
-				partition.setCommittedOffset(offsetToCommit);
-			}
-		}
-
-		// record the work to be committed by the main consumer thread and make sure the consumer notices that
-		consumerThread.setOffsetsToCommit(offsetsToCommit);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
deleted file mode 100644
index c17aae6..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.List;
-
-/**
- * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}.
- * 
- * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
- * for example changing {@code assign(List)} to {@code assign(Collection)}.
- * 
- * Because of that, we need to two versions whose compiled code goes against different method signatures.
- * Even though the source of subclasses may look identical, the byte code will be different, because they
- * are compiled against different dependencies.
- */
-public class KafkaConsumerCallBridge {
-
-	public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
-		consumer.assign(topicPartitions);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
deleted file mode 100644
index 9cfa840..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The thread the runs the {@link KafkaConsumer}, connecting to the brokers and polling records.
- * The thread pushes the data into a {@link Handover} to be picked up by the fetcher that will
- * deserialize and emit the records.
- * 
- * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to shut it down.
- * The Kafka consumer code was found to not always handle interrupts well, and to even
- * deadlock in certain situations.
- * 
- * <p>Implementation Note: This code is written to be reusable in later versions of the KafkaConsumer.
- * Because Kafka is not maintaining binary compatibility, we use a "call bridge" as an indirection
- * to the KafkaConsumer calls that change signature.
- */
-public class KafkaConsumerThread extends Thread {
-
-	/** Logger for this consumer */
-	private final Logger log;
-
-	/** The handover of data and exceptions between the consumer thread and the task thread */
-	private final Handover handover;
-
-	/** The next offsets that the main thread should commit */
-	private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
-
-	/** The configuration for the Kafka consumer */
-	private final Properties kafkaProperties;
-
-	/** The partitions that this consumer reads from */ 
-	private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions;
-
-	/** We get this from the outside to publish metrics. **/
-	private final MetricGroup kafkaMetricGroup;
-
-	/** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken */
-	private final KafkaConsumerCallBridge consumerCallBridge;
-
-	/** The maximum number of milliseconds to wait for a fetch batch */
-	private final long pollTimeout;
-
-	/** Flag whether to add Kafka's metrics to the Flink metrics */
-	private final boolean useMetrics;
-
-	/** Reference to the Kafka consumer, once it is created */
-	private volatile KafkaConsumer<byte[], byte[]> consumer;
-
-	/** Flag to mark the main work loop as alive */
-	private volatile boolean running;
-
-	/** Flag tracking whether the latest commit request has completed */
-	private volatile boolean commitInProgress;
-
-
-	public KafkaConsumerThread(
-			Logger log,
-			Handover handover,
-			Properties kafkaProperties,
-			KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions,
-			MetricGroup kafkaMetricGroup,
-			KafkaConsumerCallBridge consumerCallBridge,
-			String threadName,
-			long pollTimeout,
-			boolean useMetrics) {
-
-		super(threadName);
-		setDaemon(true);
-
-		this.log = checkNotNull(log);
-		this.handover = checkNotNull(handover);
-		this.kafkaProperties = checkNotNull(kafkaProperties);
-		this.subscribedPartitions = checkNotNull(subscribedPartitions);
-		this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
-		this.consumerCallBridge = checkNotNull(consumerCallBridge);
-		this.pollTimeout = pollTimeout;
-		this.useMetrics = useMetrics;
-
-		this.nextOffsetsToCommit = new AtomicReference<>();
-		this.running = true;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void run() {
-		// early exit check
-		if (!running) {
-			return;
-		}
-
-		// this is the means to talk to FlinkKafkaConsumer's main thread
-		final Handover handover = this.handover;
-
-		// This method initializes the KafkaConsumer and guarantees it is torn down properly.
-		// This is important, because the consumer has multi-threading issues,
-		// including concurrent 'close()' calls.
-		final KafkaConsumer<byte[], byte[]> consumer;
-		try {
-			consumer = new KafkaConsumer<>(kafkaProperties);
-		}
-		catch (Throwable t) {
-			handover.reportError(t);
-			return;
-		}
-
-		// from here on, the consumer is guaranteed to be closed properly
-		try {
-			// The callback invoked by Kafka once an offset commit is complete
-			final OffsetCommitCallback offsetCommitCallback = new CommitCallback();
-
-			// tell the consumer which partitions to work with
-			consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions));
-
-			// register Kafka's very own metrics in Flink's metric reporters
-			if (useMetrics) {
-				// register Kafka metrics to Flink
-				Map<MetricName, ? extends Metric> metrics = consumer.metrics();
-				if (metrics == null) {
-					// MapR's Kafka implementation returns null here.
-					log.info("Consumer implementation does not support metrics");
-				} else {
-					// we have Kafka metrics, register them
-					for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
-						kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
-					}
-				}
-			}
-
-			// early exit check
-			if (!running) {
-				return;
-			}
-
-			// seek the consumer to the initial offsets
-			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
-				if (partition.isOffsetDefined()) {
-					log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; " +
-							"seeking the consumer to position {}",
-							partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
-
-					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
-				}
-				else {
-					// for partitions that do not have offsets restored from a checkpoint/savepoint,
-					// we need to define our internal offset state for them using the initial offsets retrieved from Kafka
-					// by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
-
-					long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
-
-					log.info("Partition {} has no initial offset; the consumer has position {}, " +
-							"so the initial offset will be set to {}",
-							partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
-
-					// the fetched offset represents the next record to process, so we need to subtract it by 1
-					partition.setOffset(fetchedOffset - 1);
-				}
-			}
-
-			// from now on, external operations may call the consumer
-			this.consumer = consumer;
-
-			// the latest bulk of records. may carry across the loop if the thread is woken up
-			// from blocking on the handover
-			ConsumerRecords<byte[], byte[]> records = null;
-
-			// main fetch loop
-			while (running) {
-
-				// check if there is something to commit
-				if (!commitInProgress) {
-					// get and reset the work-to-be committed, so we don't repeatedly commit the same
-					final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
-
-					if (toCommit != null) {
-						log.debug("Sending async offset commit request to Kafka broker");
-
-						// also record that a commit is already in progress
-						// the order here matters! first set the flag, then send the commit command.
-						commitInProgress = true;
-						consumer.commitAsync(toCommit, offsetCommitCallback);
-					}
-				}
-
-				// get the next batch of records, unless we did not manage to hand the old batch over
-				if (records == null) {
-					try {
-						records = consumer.poll(pollTimeout);
-					}
-					catch (WakeupException we) {
-						continue;
-					}
-				}
-
-				try {
-					handover.produce(records);
-					records = null;
-				}
-				catch (Handover.WakeupException e) {
-					// fall through the loop
-				}
-			}
-			// end main fetch loop
-		}
-		catch (Throwable t) {
-			// let the main thread know and exit
-			// it may be that this exception comes because the main thread closed the handover, in
-			// which case the below reporting is irrelevant, but does not hurt either
-			handover.reportError(t);
-		}
-		finally {
-			// make sure the handover is closed if it is not already closed or has an error
-			handover.close();
-
-			// make sure the KafkaConsumer is closed
-			try {
-				consumer.close();
-			}
-			catch (Throwable t) {
-				log.warn("Error while closing Kafka consumer", t);
-			}
-		}
-	}
-
-	/**
-	 * Shuts this thread down, waking up the thread gracefully if blocked (without Thread.interrupt() calls).
-	 */
-	public void shutdown() {
-		running = false;
-
-		// We cannot call close() on the KafkaConsumer, because it will actually throw
-		// an exception if a concurrent call is in progress
-
-		// this wakes up the consumer if it is blocked handing over records
-		handover.wakeupProducer();
-
-		// this wakes up the consumer if it is blocked in a kafka poll 
-		if (consumer != null) {
-			consumer.wakeup();
-		}
-	}
-
-	/**
-	 * Tells this thread to commit a set of offsets. This method does not block, the committing
-	 * operation will happen asynchronously.
-	 * 
-	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
-	 * the frequency with which this method is called, then some commits may be skipped due to being
-	 * superseded  by newer ones.
-	 * 
-	 * @param offsetsToCommit The offsets to commit
-	 */
-	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
-		// record the work to be committed by the main consumer thread and make sure the consumer notices that
-		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
-			log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
-					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
-					"This does not compromise Flink's checkpoint integrity.");
-		}
-
-		// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
-		handover.wakeupProducer();
-		if (consumer != null) {
-			consumer.wakeup();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
-		ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
-		for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
-			result.add(p.getKafkaPartitionHandle());
-		}
-		return result;
-	}
-
-	// ------------------------------------------------------------------------
-
-	private class CommitCallback implements OffsetCommitCallback {
-
-		@Override
-		public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
-			commitInProgress = false;
-
-			if (ex != null) {
-				log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
deleted file mode 100644
index 6bdfb48..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
deleted file mode 100644
index 7a82365..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
-import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.TopicPartition;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-/**
- * Unit tests for the {@link Kafka09Fetcher}.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaConsumerThread.class)
-public class Kafka09FetcherTest {
-
-	@Test
-	public void testCommitDoesNotBlock() throws Exception {
-
-		// test data
-		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
-		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
-		testCommitData.put(testPartition, 11L);
-
-		// to synchronize when the consumer is in its blocking method
-		final OneShotLatch sync = new OneShotLatch();
-
-		// ----- the mock consumer with blocking poll calls ----
-		final MultiShotLatch blockerLatch = new MultiShotLatch();
-		
-		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-			
-			@Override
-			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
-				sync.trigger();
-				blockerLatch.await();
-				return ConsumerRecords.empty();
-			}
-		});
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) {
-				blockerLatch.trigger();
-				return null;
-			}
-		}).when(mockConsumer).wakeup();
-
-		// make sure the fetcher creates the mock consumer
-		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-		// ----- create the test fetcher -----
-
-		@SuppressWarnings("unchecked")
-		SourceContext<String> sourceContext = mock(SourceContext.class);
-		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
-		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
-				sourceContext,
-				topics,
-				null, /* periodic watermark extractor */
-				null, /* punctuated watermark extractor */
-				new TestProcessingTimeService(),
-				10, /* watermark interval */
-				this.getClass().getClassLoader(),
-				true, /* checkpointing */
-				"task_name",
-				new UnregisteredMetricsGroup(),
-				schema,
-				new Properties(),
-				0L,
-				false);
-
-		// ----- run the fetcher -----
-
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-		final Thread fetcherRunner = new Thread("fetcher runner") {
-
-			@Override
-			public void run() {
-				try {
-					fetcher.runFetchLoop();
-				} catch (Throwable t) {
-					error.set(t);
-				}
-			}
-		};
-		fetcherRunner.start();
-
-		// wait until the fetcher has reached the method of interest
-		sync.await();
-
-		// ----- trigger the offset commit -----
-		
-		final AtomicReference<Throwable> commitError = new AtomicReference<>();
-		final Thread committer = new Thread("committer runner") {
-			@Override
-			public void run() {
-				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData);
-				} catch (Throwable t) {
-					commitError.set(t);
-				}
-			}
-		};
-		committer.start();
-
-		// ----- ensure that the committer finishes in time  -----
-		committer.join(30000);
-		assertFalse("The committer did not finish in time", committer.isAlive());
-
-		// ----- test done, wait till the fetcher is done for a clean shutdown -----
-		fetcher.cancel();
-		fetcherRunner.join();
-
-		// check that there were no errors in the fetcher
-		final Throwable fetcherError = error.get();
-		if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
-			throw new Exception("Exception in the fetcher", fetcherError);
-		}
-		final Throwable committerError = commitError.get();
-		if (committerError != null) {
-			throw new Exception("Exception in the committer", committerError);
-		}
-	}
-
-	@Test
-	public void ensureOffsetsGetCommitted() throws Exception {
-		
-		// test data
-		final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
-		final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
-		
-		final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
-		testCommitData1.put(testPartition1, 11L);
-		testCommitData1.put(testPartition2, 18L);
-
-		final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
-		testCommitData2.put(testPartition1, 19L);
-		testCommitData2.put(testPartition2, 28L);
-
-		final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
-
-
-		// ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
-
-		final MultiShotLatch blockerLatch = new MultiShotLatch();
-
-		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-
-		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-			@Override
-			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
-				blockerLatch.await();
-				return ConsumerRecords.empty();
-			}
-		});
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) {
-				blockerLatch.trigger();
-				return null;
-			}
-		}).when(mockConsumer).wakeup();
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) {
-				@SuppressWarnings("unchecked")
-				Map<TopicPartition, OffsetAndMetadata> offsets = 
-						(Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
-
-				OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
-
-				commitStore.add(offsets);
-				callback.onComplete(offsets, null);
-
-				return null; 
-			}
-		}).when(mockConsumer).commitAsync(
-				Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
-
-		// make sure the fetcher creates the mock consumer
-		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-		// ----- create the test fetcher -----
-
-		@SuppressWarnings("unchecked")
-		SourceContext<String> sourceContext = mock(SourceContext.class);
-		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
-		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
-				sourceContext,
-				topics,
-				null, /* periodic watermark extractor */
-				null, /* punctuated watermark extractor */
-				new TestProcessingTimeService(),
-				10, /* watermark interval */
-				this.getClass().getClassLoader(),
-				true, /* checkpointing */
-				"task_name",
-				new UnregisteredMetricsGroup(),
-				schema,
-				new Properties(),
-				0L,
-				false);
-
-
-		// ----- run the fetcher -----
-
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-		final Thread fetcherRunner = new Thread("fetcher runner") {
-
-			@Override
-			public void run() {
-				try {
-					fetcher.runFetchLoop();
-				} catch (Throwable t) {
-					error.set(t);
-				}
-			}
-		};
-		fetcherRunner.start();
-
-		// ----- trigger the first offset commit -----
-
-		fetcher.commitInternalOffsetsToKafka(testCommitData1);
-		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
-
-		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
-			TopicPartition partition = entry.getKey();
-			if (partition.topic().equals("test")) {
-				assertEquals(42, partition.partition());
-				assertEquals(12L, entry.getValue().offset());
-			}
-			else if (partition.topic().equals("another")) {
-				assertEquals(99, partition.partition());
-				assertEquals(17L, entry.getValue().offset());
-			}
-		}
-
-		// ----- trigger the second offset commit -----
-
-		fetcher.commitInternalOffsetsToKafka(testCommitData2);
-		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
-
-		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
-			TopicPartition partition = entry.getKey();
-			if (partition.topic().equals("test")) {
-				assertEquals(42, partition.partition());
-				assertEquals(20L, entry.getValue().offset());
-			}
-			else if (partition.topic().equals("another")) {
-				assertEquals(99, partition.partition());
-				assertEquals(27L, entry.getValue().offset());
-			}
-		}
-		
-		// ----- test done, wait till the fetcher is done for a clean shutdown -----
-		fetcher.cancel();
-		fetcherRunner.join();
-
-		// check that there were no errors in the fetcher
-		final Throwable caughtError = error.get();
-		if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
-			throw new Exception("Exception in the fetcher", caughtError);
-		}
-	}
-
-	@Test
-	public void testCancellationWhenEmitBlocks() throws Exception {
-
-		// ----- some test data -----
-
-		final String topic = "test-topic";
-		final int partition = 3;
-		final byte[] payload = new byte[] {1, 2, 3, 4};
-
-		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
-
-		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
-		data.put(new TopicPartition(topic, partition), records);
-
-		final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
-
-		// ----- the test consumer -----
-
-		final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-			@Override
-			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
-				return consumerRecords;
-			}
-		});
-
-		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-		// ----- build a fetcher -----
-
-		BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
-		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
-		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
-				sourceContext,
-				topics,
-				null, /* periodic watermark extractor */
-				null, /* punctuated watermark extractor */
-				new TestProcessingTimeService(),
-				10, /* watermark interval */
-				this.getClass().getClassLoader(),
-				true, /* checkpointing */
-				"task_name",
-				new UnregisteredMetricsGroup(),
-				schema,
-				new Properties(),
-				0L,
-				false);
-
-
-		// ----- run the fetcher -----
-
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-		final Thread fetcherRunner = new Thread("fetcher runner") {
-
-			@Override
-			public void run() {
-				try {
-					fetcher.runFetchLoop();
-				} catch (Throwable t) {
-					error.set(t);
-				}
-			}
-		};
-		fetcherRunner.start();
-
-		// wait until the thread started to emit records to the source context
-		sourceContext.waitTillHasBlocker();
-
-		// now we try to cancel the fetcher, including the interruption usually done on the task thread
-		// once it has finished, there must be no more thread blocked on the source context
-		fetcher.cancel();
-		fetcherRunner.interrupt();
-		fetcherRunner.join();
-
-		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
-	}
-
-	// ------------------------------------------------------------------------
-	//  test utilities
-	// ------------------------------------------------------------------------
-
-	private static final class BlockingSourceContext<T> implements SourceContext<T> {
-
-		private final ReentrantLock lock = new ReentrantLock();
-		private final OneShotLatch inBlocking = new OneShotLatch();
-
-		@Override
-		public void collect(T element) {
-			block();
-		}
-
-		@Override
-		public void collectWithTimestamp(T element, long timestamp) {
-			block();
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			block();
-		}
-
-		@Override
-		public Object getCheckpointLock() {
-			return new Object();
-		}
-
-		@Override
-		public void close() {}
-
-		public void waitTillHasBlocker() throws InterruptedException {
-			inBlocking.await();
-		}
-
-		public boolean isStillBlocking() {
-			return lock.isLocked();
-		}
-
-		@SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
-		private void block() {
-			lock.lock();
-			try {
-				inBlocking.trigger();
-
-				// put this thread to sleep indefinitely
-				final Object o = new Object();
-				while (true) {
-					synchronized (o) {
-						o.wait();
-					}
-				}
-			}
-			catch (InterruptedException e) {
-				// exit cleanly, simply reset the interruption flag
-				Thread.currentThread().interrupt();
-			}
-			finally {
-				lock.unlock();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
deleted file mode 100644
index d18e2a9..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-public class Kafka09ITCase extends KafkaConsumerTestBase {
-
-	// ------------------------------------------------------------------------
-	//  Suite of Tests
-	// ------------------------------------------------------------------------
-
-	@Test(timeout = 60000)
-	public void testFailOnNoBroker() throws Exception {
-		runFailOnNoBrokerTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testConcurrentProducerConsumerTopology() throws Exception {
-		runSimpleConcurrentProducerConsumerTopology();
-	}
-
-
-	@Test(timeout = 60000)
-	public void testKeyValueSupport() throws Exception {
-		runKeyValueTest();
-	}
-
-	// --- canceling / failures ---
-
-	@Test(timeout = 60000)
-	public void testCancelingEmptyTopic() throws Exception {
-		runCancelingOnEmptyInputTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testCancelingFullTopic() throws Exception {
-		runCancelingOnFullInputTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testFailOnDeploy() throws Exception {
-		runFailOnDeployTest();
-	}
-
-
-	// --- source to partition mappings and exactly once ---
-
-	@Test(timeout = 60000)
-	public void testOneToOneSources() throws Exception {
-		runOneToOneExactlyOnceTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testOneSourceMultiplePartitions() throws Exception {
-		runOneSourceMultiplePartitionsExactlyOnceTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testMultipleSourcesOnePartition() throws Exception {
-		runMultipleSourcesOnePartitionExactlyOnceTest();
-	}
-
-	// --- broker failure ---
-
-	@Test(timeout = 60000)
-	public void testBrokerFailure() throws Exception {
-		runBrokerFailureTest();
-	}
-
-	// --- special executions ---
-
-	@Test(timeout = 60000)
-	public void testBigRecordJob() throws Exception {
-		runBigRecordTestTopology();
-	}
-
-	@Test(timeout = 60000)
-	public void testMultipleTopics() throws Exception {
-		runProduceConsumeMultipleTopics();
-	}
-
-	@Test(timeout = 60000)
-	public void testAllDeletes() throws Exception {
-		runAllDeletesTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testEndOfStream() throws Exception {
-		runEndOfStreamTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testMetrics() throws Throwable {
-		runMetricsTest();
-	}
-
-	// --- offset committing ---
-
-	@Test(timeout = 60000)
-	public void testCommitOffsetsToKafka() throws Exception {
-		runCommitOffsetsToKafka();
-	}
-
-	@Test(timeout = 60000)
-	public void testStartFromKafkaCommitOffsets() throws Exception {
-		runStartFromKafkaCommitOffsets();
-	}
-
-	@Test(timeout = 60000)
-	public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
-		runAutoOffsetRetrievalAndCommitToKafka();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
deleted file mode 100644
index 45f70ac..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
-
-	@Override
-	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
-			final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
-		return new Kafka09JsonTableSink(topic, properties, partitioner) {
-			@Override
-			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
-					SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
-				return kafkaProducer;
-			}
-		};
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected SerializationSchema<Row> getSerializationSchema() {
-		return new JsonRowSerializationSchema(FIELD_NAMES);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
deleted file mode 100644
index 4a75f50..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import java.util.Properties;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
-
-	@Override
-	protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
-		return new Kafka09JsonTableSource(topic, properties, fieldNames, typeInfo);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
-		return (Class) JsonRowDeserializationSchema.class;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer09.class;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
deleted file mode 100644
index ae4f5b2..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class Kafka09ProducerITCase extends KafkaProducerTestBase {
-
-	@Test
-	public void testCustomPartitioning() {
-		runCustomPartitioningTest();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
deleted file mode 100644
index e748537..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.test.util.SecureTestEnvironment;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/*
- * Kafka Secure Connection (kerberos) IT test case
- */
-public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
-
-	protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
-
-	@BeforeClass
-	public static void prepare() throws IOException, ClassNotFoundException {
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    Starting Kafka09SecuredRunITCase ");
-		LOG.info("-------------------------------------------------------------------------");
-
-		SecureTestEnvironment.prepare(tempFolder);
-		SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
-
-		startClusters(true);
-	}
-
-	@AfterClass
-	public static void shutDownServices() {
-		shutdownClusters();
-		SecureTestEnvironment.cleanup();
-	}
-
-
-	//timeout interval is large since in Travis, ZK connection timeout occurs frequently
-	//The timeout for the test case is 2 times timeout of ZK connection
-	@Test(timeout = 600000)
-	public void testMultipleTopics() throws Exception {
-		runProduceConsumeMultipleTopics();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
deleted file mode 100644
index 18b2aec..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-import java.util.concurrent.Future;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FlinkKafkaProducerBase.class)
-public class KafkaProducerTest extends TestLogger {
-	
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testPropagateExceptions() {
-		try {
-			// mock kafka producer
-			KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
-			
-			// partition setup
-			when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-				// returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour
-				Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
-
-			// failure when trying to send an element
-			when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
-				.thenAnswer(new Answer<Future<RecordMetadata>>() {
-					@Override
-					public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
-						Callback callback = (Callback) invocation.getArguments()[1];
-						callback.onCompletion(null, new Exception("Test error"));
-						return null;
-					}
-				});
-			
-			// make sure the FlinkKafkaProducer instantiates our mock producer
-			whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-			
-			// (1) producer that propagates errors
-
-			FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
-
-			OneInputStreamOperatorTestHarness<String, Object> testHarness =
-					new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
-
-			testHarness.open();
-
-			try {
-				testHarness.processElement(new StreamRecord<>("value"));
-				testHarness.processElement(new StreamRecord<>("value"));
-				fail("This should fail with an exception");
-			}
-			catch (Exception e) {
-				assertNotNull(e.getCause());
-				assertNotNull(e.getCause().getMessage());
-				assertTrue(e.getCause().getMessage().contains("Test error"));
-			}
-
-			// (2) producer that only logs errors
-
-			FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
-			producerLogging.setLogFailuresOnly(true);
-
-			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
-
-			testHarness.open();
-
-			testHarness.processElement(new StreamRecord<>("value"));
-			testHarness.processElement(new StreamRecord<>("value"));
-
-			testHarness.close();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
deleted file mode 100644
index 1802e0c..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.common.KafkaException;
-import kafka.api.PartitionMetadata;
-import kafka.network.SocketServer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.Seq;
-
-import java.io.File;
-import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * An implementation of the KafkaServerProvider for Kafka 0.9
- */
-public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
-
-	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
-	private File tmpZkDir;
-	private File tmpKafkaParent;
-	private List<File> tmpKafkaDirs;
-	private List<KafkaServer> brokers;
-	private TestingServer zookeeper;
-	private String zookeeperConnectionString;
-	private String brokerConnectionString = "";
-	private Properties standardProps;
-	private Properties additionalServerProperties;
-	private boolean secureMode = false;
-	// 6 seconds is default. Seems to be too small for travis. 30 seconds
-	private String zkTimeout = "30000";
-
-	public String getBrokerConnectionString() {
-		return brokerConnectionString;
-	}
-
-	@Override
-	public Properties getStandardProperties() {
-		return standardProps;
-	}
-
-	@Override
-	public String getVersion() {
-		return "0.9";
-	}
-
-	@Override
-	public List<KafkaServer> getBrokers() {
-		return brokers;
-	}
-
-	@Override
-	public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
-		return new FlinkKafkaConsumer09<>(topics, readSchema, props);
-	}
-
-	@Override
-	public <T> StreamSink<T> getProducerSink(
-			String topic,
-			KeyedSerializationSchema<T> serSchema,
-			Properties props,
-			KafkaPartitioner<T> partitioner) {
-		FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
-		prod.setFlushOnCheckpoint(true);
-		return new StreamSink<>(prod);
-	}
-
-	@Override
-	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
-		FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
-		prod.setFlushOnCheckpoint(true);
-		return stream.addSink(prod);
-	}
-
-	@Override
-	public KafkaOffsetHandler createOffsetHandler(Properties props) {
-		return new KafkaOffsetHandlerImpl(props);
-	}
-
-	@Override
-	public void restartBroker(int leaderId) throws Exception {
-		brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
-	}
-
-	@Override
-	public int getLeaderToShutDown(String topic) throws Exception {
-		ZkUtils zkUtils = getZkUtils();
-		try {
-			PartitionMetadata firstPart = null;
-			do {
-				if (firstPart != null) {
-					LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
-					// not the first try. Sleep a bit
-					Thread.sleep(150);
-				}
-
-				Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata();
-				firstPart = partitionMetadata.head();
-			}
-			while (firstPart.errorCode() != 0);
-
-			return firstPart.leader().get().id();
-		} finally {
-			zkUtils.close();
-		}
-	}
-
-	@Override
-	public int getBrokerId(KafkaServer server) {
-		return server.config().brokerId();
-	}
-
-	@Override
-	public boolean isSecureRunSupported() {
-		return true;
-	}
-
-	@Override
-	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
-
-		//increase the timeout since in Travis ZK connection takes long time for secure connection.
-		if(secureMode) {
-			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
-			numKafkaServers = 1;
-			zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
-		}
-
-		this.additionalServerProperties = additionalServerProperties;
-		this.secureMode = secureMode;
-		File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
-		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
-		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
-
-		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
-		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
-
-		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-		for (int i = 0; i < numKafkaServers; i++) {
-			File tmpDir = new File(tmpKafkaParent, "server-" + i);
-			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
-			tmpKafkaDirs.add(tmpDir);
-		}
-
-		zookeeper = null;
-		brokers = null;
-
-		try {
-			LOG.info("Starting Zookeeper");
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-			LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
-
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(numKafkaServers);
-
-			for (int i = 0; i < numKafkaServers; i++) {
-				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
-
-				SocketServer socketServer = brokers.get(i).socketServer();
-				if(secureMode) {
-					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
-				} else {
-					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
-				}
-			}
-
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
-		}
-
-		LOG.info("brokerConnectionString --> {}", brokerConnectionString);
-
-		standardProps = new Properties();
-		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
-		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
-		standardProps.setProperty("group.id", "flink-tests");
-		standardProps.setProperty("enable.auto.commit", "false");
-		standardProps.setProperty("zookeeper.session.timeout.ms", zkTimeout);
-		standardProps.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
-		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value)
-		standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
-
-	}
-
-	@Override
-	public void shutdown() {
-		for (KafkaServer broker : brokers) {
-			if (broker != null) {
-				broker.shutdown();
-			}
-		}
-		brokers.clear();
-
-		if (zookeeper != null) {
-			try {
-				zookeeper.stop();
-				zookeeper.close();
-			}
-			catch (Exception e) {
-				LOG.warn("ZK.stop() failed", e);
-			}
-			zookeeper = null;
-		}
-
-		// clean up the temp spaces
-
-		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
-			try {
-				FileUtils.deleteDirectory(tmpKafkaParent);
-			}
-			catch (Exception e) {
-				// ignore
-			}
-		}
-		if (tmpZkDir != null && tmpZkDir.exists()) {
-			try {
-				FileUtils.deleteDirectory(tmpZkDir);
-			}
-			catch (Exception e) {
-				// ignore
-			}
-		}
-	}
-
-	public ZkUtils getZkUtils() {
-		LOG.info("In getZKUtils:: zookeeperConnectionString = {}", zookeeperConnectionString);
-		ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
-				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
-		return ZkUtils.apply(creator, false);
-	}
-
-	@Override
-	public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
-		// create topic with one client
-		LOG.info("Creating topic {}", topic);
-
-		ZkUtils zkUtils = getZkUtils();
-		try {
-			AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig);
-		} finally {
-			zkUtils.close();
-		}
-
-		LOG.info("Topic {} create request is successfully posted", topic);
-
-		// validate that the topic has been created
-		final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout);
-		do {
-			try {
-				if(secureMode) {
-					//increase wait time since in Travis ZK timeout occurs frequently
-					int wait = Integer.parseInt(zkTimeout) / 100;
-					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
-					Thread.sleep(wait);
-				} else {
-					Thread.sleep(100);
-				}
-
-			} catch (InterruptedException e) {
-				// restore interrupted state
-			}
-			// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
-			// not always correct.
-
-			LOG.info("Validating if the topic {} has been created or not", topic);
-
-			// create a new ZK utils connection
-			ZkUtils checkZKConn = getZkUtils();
-			if(AdminUtils.topicExists(checkZKConn, topic)) {
-				LOG.info("topic {} has been created successfully", topic);
-				checkZKConn.close();
-				return;
-			}
-			LOG.info("topic {} has not been created yet. Will check again...", topic);
-			checkZKConn.close();
-		}
-		while (System.currentTimeMillis() < deadline);
-		fail("Test topic could not be created");
-	}
-
-	@Override
-	public void deleteTestTopic(String topic) {
-		ZkUtils zkUtils = getZkUtils();
-		try {
-			LOG.info("Deleting topic {}", topic);
-
-			ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
-				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
-
-			AdminUtils.deleteTopic(zkUtils, topic);
-
-			zk.close();
-		} finally {
-			zkUtils.close();
-		}
-	}
-
-	/**
-	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
-	 */
-	protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
-		Properties kafkaProperties = new Properties();
-
-		// properties have to be Strings
-		kafkaProperties.put("advertised.host.name", KAFKA_HOST);
-		kafkaProperties.put("broker.id", Integer.toString(brokerId));
-		kafkaProperties.put("log.dir", tmpFolder.toString());
-		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
-		kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
-		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
-
-		// for CI stability, increase zookeeper session timeout
-		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
-		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
-		if(additionalServerProperties != null) {
-			kafkaProperties.putAll(additionalServerProperties);
-		}
-
-		final int numTries = 5;
-
-		for (int i = 1; i <= numTries; i++) {
-			int kafkaPort = NetUtils.getAvailablePort();
-			kafkaProperties.put("port", Integer.toString(kafkaPort));
-
-			//to support secure kafka cluster
-			if(secureMode) {
-				LOG.info("Adding Kafka secure configurations");
-				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
-				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
-				kafkaProperties.putAll(getSecureProperties());
-			}
-
-			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
-			try {
-				scala.Option<String> stringNone = scala.Option.apply(null);
-				KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
-				server.startup();
-				return server;
-			}
-			catch (KafkaException e) {
-				if (e.getCause() instanceof BindException) {
-					// port conflict, retry...
-					LOG.info("Port conflict when starting Kafka Broker. Retrying...");
-				}
-				else {
-					throw e;
-				}
-			}
-		}
-
-		throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
-	}
-
-	public Properties getSecureProperties() {
-		Properties prop = new Properties();
-		if(secureMode) {
-			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
-			prop.put("security.protocol", "SASL_PLAINTEXT");
-			prop.put("sasl.kerberos.service.name", "kafka");
-
-			//add special timeout for Travis
-			prop.setProperty("zookeeper.session.timeout.ms", zkTimeout);
-			prop.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
-			prop.setProperty("metadata.fetch.timeout.ms","120000");
-		}
-		return prop;
-	}
-
-	private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
-
-		private final KafkaConsumer<byte[], byte[]> offsetClient;
-
-		public KafkaOffsetHandlerImpl(Properties props) {
-			offsetClient = new KafkaConsumer<>(props);
-		}
-
-		@Override
-		public Long getCommittedOffset(String topicName, int partition) {
-			OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition));
-			return (committed != null) ? committed.offset() : null;
-		}
-
-		@Override
-		public void close() {
-			offsetClient.close();
-		}
-	}
-
-}