You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:01 UTC

[11/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
deleted file mode 100644
index d015157..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.common.TopicAndPartition;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.kafka.common.Node;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.SerializedValue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A fetcher that fetches data from Kafka brokers via the Kafka 0.8 low-level consumer API.
- * The fetcher also handles the explicit communication with ZooKeeper to fetch initial offsets
- * and to write offsets to ZooKeeper.
- *
- * @param <T> The type of elements produced by the fetcher.
- */
-public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
-	
-	static final KafkaTopicPartitionState<TopicAndPartition> MARKER = 
-			new KafkaTopicPartitionState<>(new KafkaTopicPartition("n/a", -1), new TopicAndPartition("n/a", -1));
-
-	private static final Logger LOG = LoggerFactory.getLogger(Kafka08Fetcher.class);
-
-	// ------------------------------------------------------------------------
-
-	/** The schema to convert between Kafka's byte messages, and Flink's objects */
-	private final KeyedDeserializationSchema<T> deserializer;
-
-	/** The properties that configure the Kafka connection */
-	private final Properties kafkaConfig;
-
-	/** The subtask's runtime context */
-	private final RuntimeContext runtimeContext;
-
-	/** The queue of partitions that are currently not assigned to a broker connection */
-	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue;
-
-	/** The behavior to use in case that an offset is not valid (any more) for a partition */
-	private final long invalidOffsetBehavior;
-
-	/** The interval in which to automatically commit (-1 if deactivated) */
-	private final long autoCommitInterval;
-
-	/** The handler that reads/writes offsets from/to ZooKeeper */
-	private volatile ZookeeperOffsetHandler zookeeperOffsetHandler;
-
-	/** Flag to track the main work loop as alive */
-	private volatile boolean running = true;
-
-
-	public Kafka08Fetcher(
-			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> assignedPartitions,
-			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
-			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			StreamingRuntimeContext runtimeContext,
-			KeyedDeserializationSchema<T> deserializer,
-			Properties kafkaProperties,
-			long invalidOffsetBehavior,
-			long autoCommitInterval,
-			boolean useMetrics) throws Exception
-	{
-		super(
-				sourceContext,
-				assignedPartitions,
-				watermarksPeriodic,
-				watermarksPunctuated,
-				runtimeContext.getProcessingTimeService(),
-				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
-				runtimeContext.getUserCodeClassLoader(),
-				useMetrics);
-
-		this.deserializer = checkNotNull(deserializer);
-		this.kafkaConfig = checkNotNull(kafkaProperties);
-		this.runtimeContext = runtimeContext;
-		this.invalidOffsetBehavior = invalidOffsetBehavior;
-		this.autoCommitInterval = autoCommitInterval;
-		this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
-
-		// initially, all these partitions are not assigned to a specific broker connection
-		for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
-			unassignedPartitionsQueue.add(partition);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Main Work Loop
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void runFetchLoop() throws Exception {
-		// the map from broker to the thread that is connected to that broker
-		final Map<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<>();
-
-		// this holds possible the exceptions from the concurrent broker connection threads
-		final ExceptionProxy errorHandler = new ExceptionProxy(Thread.currentThread());
-
-		// the offset handler handles the communication with ZooKeeper, to commit externally visible offsets
-		final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig);
-		this.zookeeperOffsetHandler = zookeeperOffsetHandler;
-
-		PeriodicOffsetCommitter periodicCommitter = null;
-		try {
-			// read offsets from ZooKeeper for partitions that did not restore offsets
-			{
-				List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
-				for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
-					if (!partition.isOffsetDefined()) {
-						partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
-					}
-				}
-
-				Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
-				for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
-					Long zkOffset = zkOffsets.get(partition.getKafkaTopicPartition());
-					if (zkOffset != null) {
-						// the offset in ZK represents the "next record to process", so we need to subtract it by 1
-						// to correctly represent our internally checkpointed offsets
-						partition.setOffset(zkOffset - 1);
-					}
-				}
-			}
-
-			// start the periodic offset committer thread, if necessary
-			if (autoCommitInterval > 0) {
-				LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval);
-
-				periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler, 
-						subscribedPartitions(), errorHandler, autoCommitInterval);
-				periodicCommitter.setName("Periodic Kafka partition offset committer");
-				periodicCommitter.setDaemon(true);
-				periodicCommitter.start();
-			}
-
-			// register offset metrics
-			if (useMetrics) {
-				final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
-				addOffsetStateGauge(kafkaMetricGroup);
-			}
-
-			// Main loop polling elements from the unassignedPartitions queue to the threads
-			while (running) {
-				// re-throw any exception from the concurrent fetcher threads
-				errorHandler.checkAndThrowException();
-
-				// wait for max 5 seconds trying to get partitions to assign
-				// if threads shut down, this poll returns earlier, because the threads inject the
-				// special marker into the queue
-				List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign = 
-						unassignedPartitionsQueue.getBatchBlocking(5000);
-				partitionsToAssign.remove(MARKER);
-
-				if (!partitionsToAssign.isEmpty()) {
-					LOG.info("Assigning {} partitions to broker threads", partitionsToAssign.size());
-					Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders = 
-							findLeaderForPartitions(partitionsToAssign, kafkaConfig);
-
-					// assign the partitions to the leaders (maybe start the threads)
-					for (Map.Entry<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader : 
-							partitionsWithLeaders.entrySet())
-					{
-						final Node leader = partitionsWithLeader.getKey();
-						final List<KafkaTopicPartitionState<TopicAndPartition>> partitions = partitionsWithLeader.getValue();
-						SimpleConsumerThread<T> brokerThread = brokerToThread.get(leader);
-
-						if (!running) {
-							break;
-						}
-
-						if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) {
-							// start new thread
-							brokerThread = createAndStartSimpleConsumerThread(partitions, leader, errorHandler);
-							brokerToThread.put(leader, brokerThread);
-						}
-						else {
-							// put elements into queue of thread
-							ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue = 
-									brokerThread.getNewPartitionsQueue();
-							
-							for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
-								if (!newPartitionsQueue.addIfOpen(fp)) {
-									// we were unable to add the partition to the broker's queue
-									// the broker has closed in the meantime (the thread will shut down)
-									// create a new thread for connecting to this broker
-									List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new ArrayList<>();
-									seedPartitions.add(fp);
-									brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, errorHandler);
-									brokerToThread.put(leader, brokerThread);
-									newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for the subsequent partitions
-								}
-							}
-						}
-					}
-				}
-				else {
-					// there were no partitions to assign. Check if any broker threads shut down.
-					// we get into this section of the code, if either the poll timed out, or the
-					// blocking poll was woken up by the marker element
-					Iterator<SimpleConsumerThread<T>> bttIterator = brokerToThread.values().iterator();
-					while (bttIterator.hasNext()) {
-						SimpleConsumerThread<T> thread = bttIterator.next();
-						if (!thread.getNewPartitionsQueue().isOpen()) {
-							LOG.info("Removing stopped consumer thread {}", thread.getName());
-							bttIterator.remove();
-						}
-					}
-				}
-
-				if (brokerToThread.size() == 0 && unassignedPartitionsQueue.isEmpty()) {
-					if (unassignedPartitionsQueue.close()) {
-						LOG.info("All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher");
-						break;
-					}
-					// we end up here if somebody added something to the queue in the meantime --> continue to poll queue again
-				}
-			}
-		}
-		catch (InterruptedException e) {
-			// this may be thrown because an exception on one of the concurrent fetcher threads
-			// woke this thread up. make sure we throw the root exception instead in that case
-			errorHandler.checkAndThrowException();
-
-			// no other root exception, throw the interrupted exception
-			throw e;
-		}
-		finally {
-			this.running = false;
-			this.zookeeperOffsetHandler = null;
-
-			// if we run a periodic committer thread, shut that down
-			if (periodicCommitter != null) {
-				periodicCommitter.shutdown();
-			}
-
-			// clear the interruption flag
-			// this allows the joining on consumer threads (on best effort) to happen in
-			// case the initial interrupt already
-			Thread.interrupted();
-
-			// make sure that in any case (completion, abort, error), all spawned threads are stopped
-			try {
-				int runningThreads;
-				do {
-					// check whether threads are alive and cancel them
-					runningThreads = 0;
-					Iterator<SimpleConsumerThread<T>> threads = brokerToThread.values().iterator();
-					while (threads.hasNext()) {
-						SimpleConsumerThread<?> t = threads.next();
-						if (t.isAlive()) {
-							t.cancel();
-							runningThreads++;
-						} else {
-							threads.remove();
-						}
-					}
-
-					// wait for the threads to finish, before issuing a cancel call again
-					if (runningThreads > 0) {
-						for (SimpleConsumerThread<?> t : brokerToThread.values()) {
-							t.join(500 / runningThreads + 1);
-						}
-					}
-				}
-				while (runningThreads > 0);
-			}
-			catch (InterruptedException ignored) {
-				// waiting for the thread shutdown apparently got interrupted
-				// restore interrupted state and continue
-				Thread.currentThread().interrupt();
-			}
-			catch (Throwable t) {
-				// we catch all here to preserve the original exception
-				LOG.error("Exception while shutting down consumer threads", t);
-			}
-
-			try {
-				zookeeperOffsetHandler.close();
-			}
-			catch (Throwable t) {
-				// we catch all here to preserve the original exception
-				LOG.error("Exception while shutting down ZookeeperOffsetHandler", t);
-			}
-		}
-	}
-
-	@Override
-	public void cancel() {
-		// signal the main thread to exit
-		this.running = false;
-
-		// make sure the main thread wakes up soon
-		this.unassignedPartitionsQueue.addIfOpen(MARKER);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Kafka 0.8 specific class instantiation
-	// ------------------------------------------------------------------------
-
-	@Override
-	public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
-		return new TopicAndPartition(partition.getTopic(), partition.getPartition());
-	}
-
-	// ------------------------------------------------------------------------
-	//  Offset handling
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
-		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
-		if (zkHandler != null) {
-			// the ZK handler takes care of incrementing the offsets by 1 before committing
-			zkHandler.prepareAndCommitOffsets(offsets);
-		}
-
-		// Set committed offsets in topic partition state
-		KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitions();
-		for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
-			Long offset = offsets.get(partition.getKafkaTopicPartition());
-			if (offset != null) {
-				partition.setCommittedOffset(offset);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(
-			List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
-			Node leader,
-			ExceptionProxy errorHandler) throws IOException, ClassNotFoundException
-	{
-		// each thread needs its own copy of the deserializer, because the deserializer is
-		// not necessarily thread safe
-		final KeyedDeserializationSchema<T> clonedDeserializer =
-				InstantiationUtil.clone(deserializer, runtimeContext.getUserCodeClassLoader());
-
-		// seed thread with list of fetch partitions (otherwise it would shut down immediately again
-		SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>(
-				this, errorHandler, kafkaConfig, leader, seedPartitions, unassignedPartitionsQueue, 
-				clonedDeserializer, invalidOffsetBehavior);
-
-		brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
-				runtimeContext.getTaskName(), leader.id(), leader.host(), leader.port()));
-		brokerThread.setDaemon(true);
-		brokerThread.start();
-
-		LOG.info("Starting thread {}", brokerThread.getName());
-		return brokerThread;
-	}
-
-	/**
-	 * Returns a list of unique topics from for the given partitions
-	 *
-	 * @param partitions A the partitions
-	 * @return A list of unique topics
-	 */
-	private static List<String> getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) {
-		HashSet<String> uniqueTopics = new HashSet<>();
-		for (KafkaTopicPartitionState<TopicAndPartition> fp: partitions) {
-			uniqueTopics.add(fp.getTopic());
-		}
-		return new ArrayList<>(uniqueTopics);
-	}
-
-	/**
-	 * Find leaders for the partitions
-	 *
-	 * From a high level, the method does the following:
-	 *	 - Get a list of FetchPartitions (usually only a few partitions)
-	 *	 - Get the list of topics from the FetchPartitions list and request the partitions for the topics. (Kafka doesn't support getting leaders for a set of partitions)
-	 *	 - Build a Map<Leader, List<FetchPartition>> where only the requested partitions are contained.
-	 *
-	 * @param partitionsToAssign fetch partitions list
-	 * @return leader to partitions map
-	 */
-	private static Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions(
-			List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign,
-			Properties kafkaProperties) throws Exception
-	{
-		if (partitionsToAssign.isEmpty()) {
-			throw new IllegalArgumentException("Leader request for empty partitions list");
-		}
-
-		LOG.info("Refreshing leader information for partitions {}", partitionsToAssign);
-		
-		// this request is based on the topic names
-		PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(getTopics(partitionsToAssign), kafkaProperties);
-		infoFetcher.start();
-
-		// NOTE: The kafka client apparently locks itself up sometimes
-		// when it is interrupted, so we run it only in a separate thread.
-		// since it sometimes refuses to shut down, we resort to the admittedly harsh
-		// means of killing the thread after a timeout.
-		KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
-		watchDog.start();
-
-		// this list contains ALL partitions of the requested topics
-		List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions();
-
-		// copy list to track unassigned partitions
-		List<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions = new ArrayList<>(partitionsToAssign);
-
-		// final mapping from leader -> list(fetchPartition)
-		Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> leaderToPartitions = new HashMap<>();
-
-		for(KafkaTopicPartitionLeader partitionLeader: topicPartitionWithLeaderList) {
-			if (unassignedPartitions.size() == 0) {
-				// we are done: all partitions are assigned
-				break;
-			}
-
-			Iterator<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsIterator = unassignedPartitions.iterator();
-			while (unassignedPartitionsIterator.hasNext()) {
-				KafkaTopicPartitionState<TopicAndPartition> unassignedPartition = unassignedPartitionsIterator.next();
-
-				if (unassignedPartition.getKafkaTopicPartition().equals(partitionLeader.getTopicPartition())) {
-					// we found the leader for one of the fetch partitions
-					Node leader = partitionLeader.getLeader();
-
-					List<KafkaTopicPartitionState<TopicAndPartition>> partitionsOfLeader = leaderToPartitions.get(leader);
-					if (partitionsOfLeader == null) {
-						partitionsOfLeader = new ArrayList<>();
-						leaderToPartitions.put(leader, partitionsOfLeader);
-					}
-					partitionsOfLeader.add(unassignedPartition);
-					unassignedPartitionsIterator.remove(); // partition has been assigned
-					break;
-				}
-			}
-		}
-
-		if (unassignedPartitions.size() > 0) {
-			throw new RuntimeException("Unable to find a leader for partitions: " + unassignedPartitions);
-		}
-
-		LOG.debug("Partitions with assigned leaders {}", leaderToPartitions);
-
-		return leaderToPartitions;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
deleted file mode 100644
index 4d61e53..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-/**
- * A watch dog thread that forcibly kills another thread, if that thread does not
- * finish in time.
- * 
- * <p>This uses the discouraged {@link Thread#stop()} method. While this is not
- * advisable, this watch dog is only for extreme cases of thread that simply
- * to not terminate otherwise.
- */
-class KillerWatchDog extends Thread {
-
-	private final Thread toKill;
-	private final long timeout;
-
-	KillerWatchDog(Thread toKill, long timeout) {
-		super("KillerWatchDog");
-		setDaemon(true);
-
-		this.toKill = toKill;
-		this.timeout = timeout;
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public void run() {
-		final long deadline = System.currentTimeMillis() + timeout;
-		long now;
-
-		while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
-			try {
-				toKill.join(deadline - now);
-			}
-			catch (InterruptedException e) {
-				// ignore here, our job is important!
-			}
-		}
-
-		// this is harsh, but this watchdog is a last resort
-		if (toKill.isAlive()) {
-			toKill.stop();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
deleted file mode 100644
index d8d927d..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
-
-import java.util.List;
-import java.util.Properties;
-
-class PartitionInfoFetcher extends Thread {
-
-	private final List<String> topics;
-	private final Properties properties;
-
-	private volatile List<KafkaTopicPartitionLeader> result;
-	private volatile Throwable error;
-
-
-	PartitionInfoFetcher(List<String> topics, Properties properties) {
-		this.topics = topics;
-		this.properties = properties;
-	}
-
-	@Override
-	public void run() {
-		try {
-			result = FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties);
-		}
-		catch (Throwable t) {
-			this.error = t;
-		}
-	}
-
-	public List<KafkaTopicPartitionLeader> getPartitions() throws Exception {
-		try {
-			this.join();
-		}
-		catch (InterruptedException e) {
-			throw new Exception("Partition fetching was cancelled before completion");
-		}
-
-		if (error != null) {
-			throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error);
-		}
-		if (result != null) {
-			return result;
-		}
-		throw new Exception("Partition fetching failed");
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
deleted file mode 100644
index 27d90f2..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.util.HashMap;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A thread that periodically writes the current Kafka partition offsets to Zookeeper.
- */
-public class PeriodicOffsetCommitter extends Thread {
-
-	/** The ZooKeeper handler */
-	private final ZookeeperOffsetHandler offsetHandler;
-	
-	private final KafkaTopicPartitionState<?>[] partitionStates;
-	
-	/** The proxy to forward exceptions to the main thread */
-	private final ExceptionProxy errorHandler;
-	
-	/** Interval in which to commit, in milliseconds */
-	private final long commitInterval;
-	
-	/** Flag to mark the periodic committer as running */
-	private volatile boolean running = true;
-
-	PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler,
-			KafkaTopicPartitionState<?>[] partitionStates,
-			ExceptionProxy errorHandler,
-			long commitInterval)
-	{
-		this.offsetHandler = checkNotNull(offsetHandler);
-		this.partitionStates = checkNotNull(partitionStates);
-		this.errorHandler = checkNotNull(errorHandler);
-		this.commitInterval = commitInterval;
-		
-		checkArgument(commitInterval > 0);
-	}
-
-	@Override
-	public void run() {
-		try {
-			while (running) {
-				Thread.sleep(commitInterval);
-
-				// create copy a deep copy of the current offsets
-				HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length);
-				for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
-					offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
-				}
-				
-				offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
-			}
-		}
-		catch (Throwable t) {
-			if (running) {
-				errorHandler.reportError(
-						new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
-			}
-		}
-	}
-
-	public void shutdown() {
-		this.running = false;
-		this.interrupt();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
deleted file mode 100644
index 35e491a..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.ErrorMapping;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.StringUtils;
-
-import org.apache.kafka.common.Node;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static java.util.Objects.requireNonNull;
-import static org.apache.flink.util.PropertiesUtil.getInt;
-
-/**
- * This class implements a thread with a connection to a single Kafka broker. The thread
- * pulls records for a set of topic partitions for which the connected broker is currently
- * the leader. The thread deserializes these records and emits them. 
- * 
- * @param <T> The type of elements that this consumer thread creates from Kafka's byte messages
- *            and emits into the Flink DataStream.
- */
-class SimpleConsumerThread<T> extends Thread {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumerThread.class);
-
-	private static final KafkaTopicPartitionState<TopicAndPartition> MARKER = Kafka08Fetcher.MARKER;
-	
-	// ------------------------------------------------------------------------
-
-	private final Kafka08Fetcher<T> owner;
-	
-	private final KeyedDeserializationSchema<T> deserializer;
-
-	private final List<KafkaTopicPartitionState<TopicAndPartition>> partitions;
-
-	private final Node broker;
-
-	/** Queue containing new fetch partitions for the consumer thread */
-	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue;
-	
-	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions;
-	
-	private final ExceptionProxy errorHandler;
-	
-	private final long invalidOffsetBehavior;
-	
-	private volatile boolean running = true;
-	
-
-	// ----------------- Simple Consumer ----------------------
-	private volatile SimpleConsumer consumer;
-
-	private final int soTimeout;
-	private final int minBytes;
-	private final int maxWait;
-	private final int fetchSize;
-	private final int bufferSize;
-	private final int reconnectLimit;
-
-
-	// exceptions are thrown locally
-	public SimpleConsumerThread(
-			Kafka08Fetcher<T> owner,
-			ExceptionProxy errorHandler,
-			Properties config,
-			Node broker,
-			List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
-			ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions,
-			KeyedDeserializationSchema<T> deserializer,
-			long invalidOffsetBehavior)
-	{
-		this.owner = owner;
-		this.errorHandler = errorHandler;
-		this.broker = broker;
-		this.partitions = seedPartitions;
-		this.deserializer = requireNonNull(deserializer);
-		this.unassignedPartitions = requireNonNull(unassignedPartitions);
-		this.newPartitionsQueue = new ClosableBlockingQueue<>();
-		this.invalidOffsetBehavior = invalidOffsetBehavior;
-		
-		// these are the actual configuration values of Kafka + their original default values.
-		this.soTimeout = getInt(config, "socket.timeout.ms", 30000);
-		this.minBytes = getInt(config, "fetch.min.bytes", 1);
-		this.maxWait = getInt(config, "fetch.wait.max.ms", 100);
-		this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576);
-		this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536);
-		this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3);
-	}
-
-	public ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> getNewPartitionsQueue() {
-		return newPartitionsQueue;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  main work loop
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void run() {
-		LOG.info("Starting to fetch from {}", this.partitions);
-
-		// set up the config values
-		final String clientId = "flink-kafka-consumer-legacy-" + broker.id();
-
-		try {
-			// create the Kafka consumer that we actually use for fetching
-			consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
-			
-			// make sure that all partitions have some offsets to start with
-			// those partitions that do not have an offset from a checkpoint need to get
-			// their start offset from ZooKeeper
-			getMissingOffsetsFromKafka(partitions);
-
-			// Now, the actual work starts :-)
-			int offsetOutOfRangeCount = 0;
-			int reconnects = 0;
-			while (running) {
-
-				// ----------------------------------- partitions list maintenance ----------------------------
-
-				// check queue for new partitions to read from:
-				List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = newPartitionsQueue.pollBatch();
-				if (newPartitions != null) {
-					// found some new partitions for this thread's broker
-					
-					// check if the new partitions need an offset lookup
-					getMissingOffsetsFromKafka(newPartitions);
-					
-					// add the new partitions (and check they are not already in there)
-					for (KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) {
-						if (partitions.contains(newPartition)) {
-							throw new IllegalStateException("Adding partition " + newPartition + 
-									" to subscribed partitions even though it is already subscribed");
-						}
-						partitions.add(newPartition);
-					}
-					
-					LOG.info("Adding {} new partitions to consumer thread {}", newPartitions.size(), getName());
-					LOG.debug("Partitions list: {}", newPartitions);
-				}
-
-				if (partitions.size() == 0) {
-					if (newPartitionsQueue.close()) {
-						// close succeeded. Closing thread
-						running = false;
-						
-						LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.", 
-								getName());
-
-						// add the wake-up marker into the queue to make the main thread
-						// immediately wake up and termination faster
-						unassignedPartitions.add(MARKER);
-
-						break;
-					} else {
-						// close failed: fetcher main thread concurrently added new partitions into the queue.
-						// go to top of loop again and get the new partitions
-						continue; 
-					}
-				}
-
-				// ----------------------------------- request / response with kafka ----------------------------
-
-				FetchRequestBuilder frb = new FetchRequestBuilder();
-				frb.clientId(clientId);
-				frb.maxWait(maxWait);
-				frb.minBytes(minBytes);
-
-				for (KafkaTopicPartitionState<?> partition : partitions) {
-					frb.addFetch(
-							partition.getKafkaTopicPartition().getTopic(),
-							partition.getKafkaTopicPartition().getPartition(),
-							partition.getOffset() + 1, // request the next record
-							fetchSize);
-				}
-				
-				kafka.api.FetchRequest fetchRequest = frb.build();
-				LOG.debug("Issuing fetch request {}", fetchRequest);
-
-				FetchResponse fetchResponse;
-				try {
-					fetchResponse = consumer.fetch(fetchRequest);
-				}
-				catch (Throwable cce) {
-					//noinspection ConstantConditions
-					if (cce instanceof ClosedChannelException) {
-						LOG.warn("Fetch failed because of ClosedChannelException.");
-						LOG.debug("Full exception", cce);
-						
-						// we don't know if the broker is overloaded or unavailable.
-						// retry a few times, then return ALL partitions for new leader lookup
-						if (++reconnects >= reconnectLimit) {
-							LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", reconnectLimit);
-							for (KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) {
-								unassignedPartitions.add(fp);
-							}
-							this.partitions.clear();
-							continue; // jump to top of loop: will close thread or subscribe to new partitions
-						}
-						try {
-							consumer.close();
-						} catch (Throwable t) {
-							LOG.warn("Error while closing consumer connection", t);
-						}
-						// delay & retry
-						Thread.sleep(100);
-						consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
-						continue; // retry
-					} else {
-						throw cce;
-					}
-				}
-				reconnects = 0;
-
-				// ---------------------------------------- error handling ----------------------------
-
-				if (fetchResponse == null) {
-					throw new IOException("Fetch from Kafka failed (request returned null)");
-				}
-				
-				if (fetchResponse.hasError()) {
-					String exception = "";
-					List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
-					
-					// iterate over partitions to get individual error codes
-					Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
-					boolean partitionsRemoved = false;
-					
-					while (partitionsIterator.hasNext()) {
-						final KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next();
-						short code = fetchResponse.errorCode(fp.getTopic(), fp.getPartition());
-
-						if (code == ErrorMapping.OffsetOutOfRangeCode()) {
-							// we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
-							// Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
-							partitionsToGetOffsetsFor.add(fp);
-						}
-						else if (code == ErrorMapping.NotLeaderForPartitionCode() ||
-								code == ErrorMapping.LeaderNotAvailableCode() ||
-								code == ErrorMapping.BrokerNotAvailableCode() ||
-								code == ErrorMapping.UnknownCode())
-						{
-							// the broker we are connected to is not the leader for the partition.
-							LOG.warn("{} is not the leader of {}. Reassigning leader for partition", broker, fp);
-							LOG.debug("Error code = {}", code);
-
-							unassignedPartitions.add(fp);
-
-							partitionsIterator.remove(); // unsubscribe the partition ourselves
-							partitionsRemoved = true;
-						}
-						else if (code != ErrorMapping.NoError()) {
-							exception += "\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " +
-									StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
-						}
-					}
-					if (partitionsToGetOffsetsFor.size() > 0) {
-						// safeguard against an infinite loop.
-						if (offsetOutOfRangeCount++ > 3) {
-							throw new RuntimeException("Found invalid offsets more than three times in partitions "
-									+ partitionsToGetOffsetsFor + " Exceptions: " + exception);
-						}
-						// get valid offsets for these partitions and try again.
-						LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
-						getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
-						
-						LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
-						continue; // jump back to create a new fetch request. The offset has not been touched.
-					}
-					else if (partitionsRemoved) {
-						continue; // create new fetch request
-					}
-					else {
-						// partitions failed on an error
-						throw new IOException("Error while fetching from broker '" + broker +"': " + exception);
-					}
-				} else {
-					// successful fetch, reset offsetOutOfRangeCount.
-					offsetOutOfRangeCount = 0;
-				}
-
-				// ----------------------------------- process fetch response ----------------------------
-
-				int messagesInFetch = 0;
-				int deletedMessages = 0;
-				Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
-				
-				partitionsLoop:
-				while (partitionsIterator.hasNext()) {
-					final KafkaTopicPartitionState<TopicAndPartition> currentPartition = partitionsIterator.next();
-					
-					final ByteBufferMessageSet messageSet = fetchResponse.messageSet(
-							currentPartition.getTopic(), currentPartition.getPartition());
-
-					for (MessageAndOffset msg : messageSet) {
-						if (running) {
-							messagesInFetch++;
-							final ByteBuffer payload = msg.message().payload();
-							final long offset = msg.offset();
-							
-							if (offset <= currentPartition.getOffset()) {
-								// we have seen this message already
-								LOG.info("Skipping message with offset " + msg.offset()
-										+ " because we have seen messages until (including) "
-										+ currentPartition.getOffset()
-										+ " from topic/partition " + currentPartition.getTopic() + '/'
-										+ currentPartition.getPartition() + " already");
-								continue;
-							}
-
-							// If the message value is null, this represents a delete command for the message key.
-							// Log this and pass it on to the client who might want to also receive delete messages.
-							byte[] valueBytes;
-							if (payload == null) {
-								deletedMessages++;
-								valueBytes = null;
-							} else {
-								valueBytes = new byte[payload.remaining()];
-								payload.get(valueBytes);
-							}
-
-							// put key into byte array
-							byte[] keyBytes = null;
-							int keySize = msg.message().keySize();
-
-							if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization
-								ByteBuffer keyPayload = msg.message().key();
-								keyBytes = new byte[keySize];
-								keyPayload.get(keyBytes);
-							}
-
-							final T value = deserializer.deserialize(keyBytes, valueBytes, 
-									currentPartition.getTopic(), currentPartition.getPartition(), offset);
-							
-							if (deserializer.isEndOfStream(value)) {
-								// remove partition from subscribed partitions.
-								partitionsIterator.remove();
-								continue partitionsLoop;
-							}
-							
-							owner.emitRecord(value, currentPartition, offset);
-						}
-						else {
-							// no longer running
-							return;
-						}
-					}
-				}
-				LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages);
-			} // end of fetch loop
-
-			if (!newPartitionsQueue.close()) {
-				throw new Exception("Bug: Cleanly leaving fetcher thread without having a closed queue.");
-			}
-		}
-		catch (Throwable t) {
-			// report to the fetcher's error handler
-			errorHandler.reportError(t);
-		}
-		finally {
-			if (consumer != null) {
-				// closing the consumer should not fail the program
-				try {
-					consumer.close();
-				}
-				catch (Throwable t) {
-					LOG.error("Error while closing the Kafka simple consumer", t);
-				}
-			}
-		}
-	}
-
-	private void getMissingOffsetsFromKafka(
-			List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException
-	{
-		// collect which partitions we should fetch offsets for
-		List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
-		for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
-			if (!part.isOffsetDefined()) {
-				// retrieve the offset from the consumer
-				partitionsToGetOffsetsFor.add(part);
-			}
-		}
-		
-		if (partitionsToGetOffsetsFor.size() > 0) {
-			getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
-			
-			LOG.info("No checkpoint/savepoint offsets found for some partitions. " +
-					"Fetched the following start offsets {}", partitionsToGetOffsetsFor);
-		}
-	}
-
-	/**
-	 * Cancels this fetch thread. The thread will release all resources and terminate.
-	 */
-	public void cancel() {
-		this.running = false;
-
-		// interrupt whatever the consumer is doing
-		if (consumer != null) {
-			consumer.close();
-		}
-
-		this.interrupt();
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Kafka Request Utils
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Request latest offsets for a set of partitions, via a Kafka consumer.
-	 *
-	 * <p>This method retries three times if the response has an error.
-	 *
-	 * @param consumer The consumer connected to lead broker
-	 * @param partitions The list of partitions we need offsets for
-	 * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
-	 */
-	private static void getLastOffsetFromKafka(
-			SimpleConsumer consumer,
-			List<KafkaTopicPartitionState<TopicAndPartition>> partitions,
-			long whichTime) throws IOException
-	{
-		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
-		for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
-			requestInfo.put(part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(whichTime, 1));
-		}
-
-		int retries = 0;
-		OffsetResponse response;
-		while (true) {
-			kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
-					requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
-			response = consumer.getOffsetsBefore(request);
-
-			if (response.hasError()) {
-				StringBuilder exception = new StringBuilder();
-				for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
-					short code;
-					if ((code = response.errorCode(part.getTopic(), part.getPartition())) != ErrorMapping.NoError()) {
-						exception.append("\nException for topic=").append(part.getTopic())
-								.append(" partition=").append(part.getPartition()).append(": ")
-								.append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code)));
-					}
-				}
-				if (++retries >= 3) {
-					throw new IOException("Unable to get last offset for partitions " + partitions + ": "
-							+ exception.toString());
-				} else {
-					LOG.warn("Unable to get last offset for partitions: Exception(s): {}", exception);
-				}
-			} else {
-				break; // leave retry loop
-			}
-		}
-
-		for (KafkaTopicPartitionState<TopicAndPartition> part: partitions) {
-			final long offset = response.offsets(part.getTopic(), part.getPartition())[0];
-			
-			// the offset returned is that of the next record to fetch. because our state reflects the latest
-			// successfully emitted record, we subtract one
-			part.setOffset(offset - 1);
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
deleted file mode 100644
index 8f2ef09..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.utils.ZKGroupTopicDirs;
-
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Handler for committing Kafka offsets to Zookeeper and to retrieve them again.
- */
-public class ZookeeperOffsetHandler {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
-
-	private final String groupId;
-
-	private final CuratorFramework curatorClient;
-
-
-	public ZookeeperOffsetHandler(Properties props) {
-		this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
-		if (this.groupId == null) {
-			throw new IllegalArgumentException("Required property '"
-					+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
-		}
-		
-		String zkConnect = props.getProperty("zookeeper.connect");
-		if (zkConnect == null) {
-			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
-		}
-
-		// we use Curator's default timeouts
-		int sessionTimeoutMs =  Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
-		int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
-		
-		// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
-		int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
-		int backoffMaxRetries =  Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
-		
-		RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
-		curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
-		curatorClient.start();
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Offset access and manipulation
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
-	 * the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
-	 * that the committed offsets to Zookeeper represent the next record to process.
-	 * 
-	 * @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
-	 * @throws Exception The method forwards exceptions.
-	 */
-	public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
-		for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
-			KafkaTopicPartition tp = entry.getKey();
-
-			Long lastProcessedOffset = entry.getValue();
-			if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
-				setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
-			}
-		}
-	}
-
-	/**
-	 * @param partitions The partitions to read offsets for.
-	 * @return The mapping from partition to offset.
-	 * @throws Exception This method forwards exceptions.
-	 */
-	public Map<KafkaTopicPartition, Long> getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception {
-		Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size());
-		for (KafkaTopicPartition tp : partitions) {
-			Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition());
-
-			if (offset != null) {
-				LOG.info("Offset for TopicPartition {}:{} was set to {} in ZooKeeper. Seeking fetcher to that position.",
-						tp.getTopic(), tp.getPartition(), offset);
-				ret.put(tp, offset);
-			}
-		}
-		return ret;
-	}
-
-	/**
-	 * Closes the offset handler.
-	 * 
-	 * @throws IOException Thrown, if the handler cannot be closed properly.
-	 */
-	public void close() throws IOException {
-		curatorClient.close();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Communication with Zookeeper
-	// ------------------------------------------------------------------------
-	
-	public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
-		String path = topicDirs.consumerOffsetDir() + "/" + partition;
-		curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
-		byte[] data = Long.toString(offset).getBytes();
-		curatorClient.setData().forPath(path, data);
-	}
-
-	public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
-		String path = topicDirs.consumerOffsetDir() + "/" + partition;
-		curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
-		
-		byte[] data = curatorClient.getData().forPath(path);
-		
-		if (data == null) {
-			return null;
-		} else {
-			String asString = new String(data);
-			if (asString.length() == 0) {
-				return null;
-			} else {
-				try {
-					return Long.valueOf(asString);
-				}
-				catch (NumberFormatException e) {
-					LOG.error(
-							"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
-						groupId, topic, partition, asString);
-					return null;
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
deleted file mode 100644
index fabb0fe..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class Kafka08ITCase extends KafkaConsumerTestBase {
-
-	// ------------------------------------------------------------------------
-	//  Suite of Tests
-	// ------------------------------------------------------------------------
-
-	@Test(timeout = 60000)
-	public void testFailOnNoBroker() throws Exception {
-		runFailOnNoBrokerTest();
-	}
-
-
-	@Test(timeout = 60000)
-	public void testConcurrentProducerConsumerTopology() throws Exception {
-		runSimpleConcurrentProducerConsumerTopology();
-	}
-
-//	@Test(timeout = 60000)
-//	public void testPunctuatedExplicitWMConsumer() throws Exception {
-//		runExplicitPunctuatedWMgeneratingConsumerTest(false);
-//	}
-
-//	@Test(timeout = 60000)
-//	public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
-//		runExplicitPunctuatedWMgeneratingConsumerTest(true);
-//	}
-
-	@Test(timeout = 60000)
-	public void testKeyValueSupport() throws Exception {
-		runKeyValueTest();
-	}
-
-	// --- canceling / failures ---
-
-	@Test(timeout = 60000)
-	public void testCancelingEmptyTopic() throws Exception {
-		runCancelingOnEmptyInputTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testCancelingFullTopic() throws Exception {
-		runCancelingOnFullInputTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testFailOnDeploy() throws Exception {
-		runFailOnDeployTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testInvalidOffset() throws Exception {
-		
-		final int parallelism = 1;
-		
-		// write 20 messages into topic:
-		final String topic = writeSequence("invalidOffsetTopic", 20, parallelism, 1);
-
-		// set invalid offset:
-		CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topic, 0, 1234);
-		curatorClient.close();
-
-		// read from topic
-		final int valuesCount = 20;
-		final int startFrom = 0;
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.getConfig().disableSysoutLogging();
-		
-		readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom);
-
-		deleteTestTopic(topic);
-	}
-
-	// --- source to partition mappings and exactly once ---
-
-	@Test(timeout = 60000)
-	public void testOneToOneSources() throws Exception {
-		runOneToOneExactlyOnceTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testOneSourceMultiplePartitions() throws Exception {
-		runOneSourceMultiplePartitionsExactlyOnceTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testMultipleSourcesOnePartition() throws Exception {
-		runMultipleSourcesOnePartitionExactlyOnceTest();
-	}
-
-	// --- broker failure ---
-
-	@Test(timeout = 60000)
-	public void testBrokerFailure() throws Exception {
-		runBrokerFailureTest();
-	}
-
-	// --- offset committing ---
-
-	@Test(timeout = 60000)
-	public void testCommitOffsetsToZookeeper() throws Exception {
-		runCommitOffsetsToKafka();
-	}
-
-	@Test(timeout = 60000)
-	public void testStartFromZookeeperCommitOffsets() throws Exception {
-		runStartFromKafkaCommitOffsets();
-	}
-
-	@Test(timeout = 60000)
-	public void testAutoOffsetRetrievalAndCommitToZookeeper() throws Exception {
-		runAutoOffsetRetrievalAndCommitToKafka();
-	}
-
-	@Test
-	public void runOffsetManipulationInZooKeeperTest() {
-		try {
-			final String topicName = "ZookeeperOffsetHandlerTest-Topic";
-			final String groupId = "ZookeeperOffsetHandlerTest-Group";
-
-			final Long offset = (long) (Math.random() * Long.MAX_VALUE);
-
-			CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient();
-			kafkaServer.createTestTopic(topicName, 3, 2);
-
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset);
-
-			Long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0);
-
-			curatorFramework.close();
-
-			assertEquals(offset, fetchedOffset);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test(timeout = 60000)
-	public void testOffsetAutocommitTest() throws Exception {
-		final int parallelism = 3;
-
-		// write a sequence from 0 to 99 to each of the 3 partitions.
-		final String topicName = writeSequence("testOffsetAutocommit", 100, parallelism, 1);
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		// NOTE: We are not enabling the checkpointing!
-		env.getConfig().disableSysoutLogging();
-		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env.setParallelism(parallelism);
-
-		// the readSequence operation sleeps for 20 ms between each record.
-		// setting a delay of 25*20 = 500 for the commit interval makes
-		// sure that we commit roughly 3-4 times while reading, however
-		// at least once.
-		Properties readProps = new Properties();
-		readProps.putAll(standardProps);
-		readProps.setProperty("auto.commit.interval.ms", "500");
-
-		// read so that the offset can be committed to ZK
-		readSequence(env, readProps, parallelism, topicName, 100, 0);
-
-		// get the offset
-		CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-
-		Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
-		Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
-		Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
-		curatorFramework.close();
-		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
-
-		// ensure that the offset has been committed
-		boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 100) ||
-			(o2 != null && o2 > 0 && o2 <= 100) ||
-			(o3 != null && o3 > 0 && o3 <= 100);
-		assertTrue("Expecting at least one offset to be set o1="+o1+" o2="+o2+" o3="+o3, atLeastOneOffsetSet);
-
-		deleteTestTopic(topicName);
-	}
-
-	// --- special executions ---
-
-	@Test(timeout = 60000)
-	public void testBigRecordJob() throws Exception {
-		runBigRecordTestTopology();
-	}
-
-	@Test(timeout = 60000)
-	public void testMultipleTopics() throws Exception {
-		runProduceConsumeMultipleTopics();
-	}
-
-	@Test(timeout = 60000)
-	public void testAllDeletes() throws Exception {
-		runAllDeletesTest();
-	}
-
-	@Test(timeout=60000)
-	public void testEndOfStream() throws Exception {
-		runEndOfStreamTest();
-	}
-
-	@Test(timeout = 60000)
-	public void testMetrics() throws Throwable {
-		runMetricsTest();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
deleted file mode 100644
index 6d0b140..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
-
-	@Override
-	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
-			final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
-		return new Kafka08JsonTableSink(topic, properties, partitioner) {
-			@Override
-			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
-					SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
-				return kafkaProducer;
-			}
-		};
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected SerializationSchema<Row> getSerializationSchema() {
-		return new JsonRowSerializationSchema(FIELD_NAMES);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
deleted file mode 100644
index a2d66ac..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import java.util.Properties;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {
-
-	@Override
-	protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
-		return new Kafka08JsonTableSource(topic, properties, fieldNames, typeInfo);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
-		return (Class) JsonRowDeserializationSchema.class;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer08.class;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
deleted file mode 100644
index 5c951db..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class Kafka08ProducerITCase extends KafkaProducerTestBase {
-
-	@Test
-	public void testCustomPartitioning() {
-		runCustomPartitioningTest();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
deleted file mode 100644
index 9520f55..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Collections;
-import java.util.Properties;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.Test;
-
-public class KafkaConsumer08Test {
-
-	@Test
-	public void testValidateZooKeeperConfig() {
-		try {
-			// empty
-			Properties emptyProperties = new Properties();
-			try {
-				FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			// no connect string (only group string)
-			Properties noConnect = new Properties();
-			noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
-			try {
-				FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			// no group string (only connect string)
-			Properties noGroup = new Properties();
-			noGroup.put("zookeeper.connect", "localhost:47574");
-			try {
-				FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testCreateSourceWithoutCluster() {
-		try {
-			Properties props = new Properties();
-			props.setProperty("zookeeper.connect", "localhost:56794");
-			props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
-			props.setProperty("group.id", "non-existent-group");
-
-			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
-			consumer.open(new Configuration());
-			fail();
-		}
-		catch (Exception e) {
-			assertTrue(e.getMessage().contains("Unable to retrieve any partitions"));
-		}
-	}
-
-	@Test
-	public void testAllBoostrapServerHostsAreInvalid() {
-		try {
-			String zookeeperConnect = "localhost:56794";
-			String bootstrapServers = "indexistentHost:11111";
-			String groupId = "non-existent-group";
-			Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
-			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
-					new SimpleStringSchema(), props);
-			consumer.open(new Configuration());
-			fail();
-		} catch (Exception e) {
-			assertTrue("Exception should be thrown containing 'all bootstrap servers invalid' message!",
-					e.getMessage().contains("All the servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
-							+ "' config are invalid"));
-		}
-	}
-
-	@Test
-	public void testAtLeastOneBootstrapServerHostIsValid() {
-		try {
-			String zookeeperConnect = "localhost:56794";
-			// we declare one valid boostrap server, namely the one with
-			// 'localhost'
-			String bootstrapServers = "indexistentHost:11111, localhost:22222";
-			String groupId = "non-existent-group";
-			Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
-			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
-					new SimpleStringSchema(), props);
-			consumer.open(new Configuration());
-			fail();
-		} catch (Exception e) {
-			// test is not failing because we have one valid boostrap server
-			assertTrue("The cause of the exception should not be 'all boostrap server are invalid'!",
-					!e.getMessage().contains("All the hosts provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
-							+ " config are invalid"));
-		}
-	}
-	
-	private Properties createKafkaProps(String zookeeperConnect, String bootstrapServers, String groupId) {
-		Properties props = new Properties();
-		props.setProperty("zookeeper.connect", zookeeperConnect);
-		props.setProperty("bootstrap.servers", bootstrapServers);
-		props.setProperty("group.id", groupId);
-		return props;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
deleted file mode 100644
index 72d2772..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaLocalSystemTime implements Time {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
-
-	@Override
-	public long milliseconds() {
-		return System.currentTimeMillis();
-	}
-
-	@Override
-	public long nanoseconds() {
-		return System.nanoTime();
-	}
-
-	@Override
-	public void sleep(long ms) {
-		try {
-			Thread.sleep(ms);
-		} catch (InterruptedException e) {
-			LOG.warn("Interruption", e);
-		}
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
deleted file mode 100644
index 91fc286..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-import java.util.concurrent.Future;
-
-
-import static org.mockito.Mockito.*;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-import static org.junit.Assert.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FlinkKafkaProducerBase.class)
-public class KafkaProducerTest extends TestLogger {
-	
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testPropagateExceptions() {
-		try {
-			// mock kafka producer
-			KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
-			
-			// partition setup
-			when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-				// returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour
-				Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
-
-			// failure when trying to send an element
-			when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
-				.thenAnswer(new Answer<Future<RecordMetadata>>() {
-					@Override
-					public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
-						Callback callback = (Callback) invocation.getArguments()[1];
-						callback.onCompletion(null, new Exception("Test error"));
-						return null;
-					}
-				});
-			
-			// make sure the FlinkKafkaProducer instantiates our mock producer
-			whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-			
-			// (1) producer that propagates errors
-
-			FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
-
-			OneInputStreamOperatorTestHarness<String, Object> testHarness =
-					new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
-
-			testHarness.open();
-
-			try {
-				testHarness.processElement(new StreamRecord<>("value"));
-				testHarness.processElement(new StreamRecord<>("value"));
-				fail("This should fail with an exception");
-			}
-			catch (Exception e) {
-				assertNotNull(e.getCause());
-				assertNotNull(e.getCause().getMessage());
-				assertTrue(e.getCause().getMessage().contains("Test error"));
-			}
-
-			testHarness.close();
-
-			// (2) producer that only logs errors
-
-			FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
-			producerLogging.setLogFailuresOnly(true);
-
-			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
-
-			testHarness.open();
-
-			testHarness.processElement(new StreamRecord<>("value"));
-			testHarness.processElement(new StreamRecord<>("value"));
-
-			testHarness.close();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}