You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:26 UTC

[36/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
new file mode 100644
index 0000000..d015157
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.common.TopicAndPartition;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.kafka.common.Node;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A fetcher that fetches data from Kafka brokers via the Kafka 0.8 low-level consumer API.
+ * The fetcher also handles the explicit communication with ZooKeeper to fetch initial offsets
+ * and to write offsets to ZooKeeper.
+ *
+ * @param <T> The type of elements produced by the fetcher.
+ */
+public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
+	
+	static final KafkaTopicPartitionState<TopicAndPartition> MARKER = 
+			new KafkaTopicPartitionState<>(new KafkaTopicPartition("n/a", -1), new TopicAndPartition("n/a", -1));
+
+	private static final Logger LOG = LoggerFactory.getLogger(Kafka08Fetcher.class);
+
+	// ------------------------------------------------------------------------
+
+	/** The schema to convert between Kafka's byte messages, and Flink's objects */
+	private final KeyedDeserializationSchema<T> deserializer;
+
+	/** The properties that configure the Kafka connection */
+	private final Properties kafkaConfig;
+
+	/** The subtask's runtime context */
+	private final RuntimeContext runtimeContext;
+
+	/** The queue of partitions that are currently not assigned to a broker connection */
+	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue;
+
+	/** The behavior to use in case that an offset is not valid (any more) for a partition */
+	private final long invalidOffsetBehavior;
+
+	/** The interval in which to automatically commit (-1 if deactivated) */
+	private final long autoCommitInterval;
+
+	/** The handler that reads/writes offsets from/to ZooKeeper */
+	private volatile ZookeeperOffsetHandler zookeeperOffsetHandler;
+
+	/** Flag to track the main work loop as alive */
+	private volatile boolean running = true;
+
+
+	public Kafka08Fetcher(
+			SourceContext<T> sourceContext,
+			List<KafkaTopicPartition> assignedPartitions,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			StreamingRuntimeContext runtimeContext,
+			KeyedDeserializationSchema<T> deserializer,
+			Properties kafkaProperties,
+			long invalidOffsetBehavior,
+			long autoCommitInterval,
+			boolean useMetrics) throws Exception
+	{
+		super(
+				sourceContext,
+				assignedPartitions,
+				watermarksPeriodic,
+				watermarksPunctuated,
+				runtimeContext.getProcessingTimeService(),
+				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+				runtimeContext.getUserCodeClassLoader(),
+				useMetrics);
+
+		this.deserializer = checkNotNull(deserializer);
+		this.kafkaConfig = checkNotNull(kafkaProperties);
+		this.runtimeContext = runtimeContext;
+		this.invalidOffsetBehavior = invalidOffsetBehavior;
+		this.autoCommitInterval = autoCommitInterval;
+		this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
+
+		// initially, all these partitions are not assigned to a specific broker connection
+		for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
+			unassignedPartitionsQueue.add(partition);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Main Work Loop
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void runFetchLoop() throws Exception {
+		// the map from broker to the thread that is connected to that broker
+		final Map<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<>();
+
+		// this holds possible the exceptions from the concurrent broker connection threads
+		final ExceptionProxy errorHandler = new ExceptionProxy(Thread.currentThread());
+
+		// the offset handler handles the communication with ZooKeeper, to commit externally visible offsets
+		final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig);
+		this.zookeeperOffsetHandler = zookeeperOffsetHandler;
+
+		PeriodicOffsetCommitter periodicCommitter = null;
+		try {
+			// read offsets from ZooKeeper for partitions that did not restore offsets
+			{
+				List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
+				for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
+					if (!partition.isOffsetDefined()) {
+						partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
+					}
+				}
+
+				Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
+				for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
+					Long zkOffset = zkOffsets.get(partition.getKafkaTopicPartition());
+					if (zkOffset != null) {
+						// the offset in ZK represents the "next record to process", so we need to subtract it by 1
+						// to correctly represent our internally checkpointed offsets
+						partition.setOffset(zkOffset - 1);
+					}
+				}
+			}
+
+			// start the periodic offset committer thread, if necessary
+			if (autoCommitInterval > 0) {
+				LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval);
+
+				periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler, 
+						subscribedPartitions(), errorHandler, autoCommitInterval);
+				periodicCommitter.setName("Periodic Kafka partition offset committer");
+				periodicCommitter.setDaemon(true);
+				periodicCommitter.start();
+			}
+
+			// register offset metrics
+			if (useMetrics) {
+				final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
+				addOffsetStateGauge(kafkaMetricGroup);
+			}
+
+			// Main loop polling elements from the unassignedPartitions queue to the threads
+			while (running) {
+				// re-throw any exception from the concurrent fetcher threads
+				errorHandler.checkAndThrowException();
+
+				// wait for max 5 seconds trying to get partitions to assign
+				// if threads shut down, this poll returns earlier, because the threads inject the
+				// special marker into the queue
+				List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign = 
+						unassignedPartitionsQueue.getBatchBlocking(5000);
+				partitionsToAssign.remove(MARKER);
+
+				if (!partitionsToAssign.isEmpty()) {
+					LOG.info("Assigning {} partitions to broker threads", partitionsToAssign.size());
+					Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders = 
+							findLeaderForPartitions(partitionsToAssign, kafkaConfig);
+
+					// assign the partitions to the leaders (maybe start the threads)
+					for (Map.Entry<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader : 
+							partitionsWithLeaders.entrySet())
+					{
+						final Node leader = partitionsWithLeader.getKey();
+						final List<KafkaTopicPartitionState<TopicAndPartition>> partitions = partitionsWithLeader.getValue();
+						SimpleConsumerThread<T> brokerThread = brokerToThread.get(leader);
+
+						if (!running) {
+							break;
+						}
+
+						if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) {
+							// start new thread
+							brokerThread = createAndStartSimpleConsumerThread(partitions, leader, errorHandler);
+							brokerToThread.put(leader, brokerThread);
+						}
+						else {
+							// put elements into queue of thread
+							ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue = 
+									brokerThread.getNewPartitionsQueue();
+							
+							for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
+								if (!newPartitionsQueue.addIfOpen(fp)) {
+									// we were unable to add the partition to the broker's queue
+									// the broker has closed in the meantime (the thread will shut down)
+									// create a new thread for connecting to this broker
+									List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new ArrayList<>();
+									seedPartitions.add(fp);
+									brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, errorHandler);
+									brokerToThread.put(leader, brokerThread);
+									newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for the subsequent partitions
+								}
+							}
+						}
+					}
+				}
+				else {
+					// there were no partitions to assign. Check if any broker threads shut down.
+					// we get into this section of the code, if either the poll timed out, or the
+					// blocking poll was woken up by the marker element
+					Iterator<SimpleConsumerThread<T>> bttIterator = brokerToThread.values().iterator();
+					while (bttIterator.hasNext()) {
+						SimpleConsumerThread<T> thread = bttIterator.next();
+						if (!thread.getNewPartitionsQueue().isOpen()) {
+							LOG.info("Removing stopped consumer thread {}", thread.getName());
+							bttIterator.remove();
+						}
+					}
+				}
+
+				if (brokerToThread.size() == 0 && unassignedPartitionsQueue.isEmpty()) {
+					if (unassignedPartitionsQueue.close()) {
+						LOG.info("All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher");
+						break;
+					}
+					// we end up here if somebody added something to the queue in the meantime --> continue to poll queue again
+				}
+			}
+		}
+		catch (InterruptedException e) {
+			// this may be thrown because an exception on one of the concurrent fetcher threads
+			// woke this thread up. make sure we throw the root exception instead in that case
+			errorHandler.checkAndThrowException();
+
+			// no other root exception, throw the interrupted exception
+			throw e;
+		}
+		finally {
+			this.running = false;
+			this.zookeeperOffsetHandler = null;
+
+			// if we run a periodic committer thread, shut that down
+			if (periodicCommitter != null) {
+				periodicCommitter.shutdown();
+			}
+
+			// clear the interruption flag
+			// this allows the joining on consumer threads (on best effort) to happen in
+			// case the initial interrupt already
+			Thread.interrupted();
+
+			// make sure that in any case (completion, abort, error), all spawned threads are stopped
+			try {
+				int runningThreads;
+				do {
+					// check whether threads are alive and cancel them
+					runningThreads = 0;
+					Iterator<SimpleConsumerThread<T>> threads = brokerToThread.values().iterator();
+					while (threads.hasNext()) {
+						SimpleConsumerThread<?> t = threads.next();
+						if (t.isAlive()) {
+							t.cancel();
+							runningThreads++;
+						} else {
+							threads.remove();
+						}
+					}
+
+					// wait for the threads to finish, before issuing a cancel call again
+					if (runningThreads > 0) {
+						for (SimpleConsumerThread<?> t : brokerToThread.values()) {
+							t.join(500 / runningThreads + 1);
+						}
+					}
+				}
+				while (runningThreads > 0);
+			}
+			catch (InterruptedException ignored) {
+				// waiting for the thread shutdown apparently got interrupted
+				// restore interrupted state and continue
+				Thread.currentThread().interrupt();
+			}
+			catch (Throwable t) {
+				// we catch all here to preserve the original exception
+				LOG.error("Exception while shutting down consumer threads", t);
+			}
+
+			try {
+				zookeeperOffsetHandler.close();
+			}
+			catch (Throwable t) {
+				// we catch all here to preserve the original exception
+				LOG.error("Exception while shutting down ZookeeperOffsetHandler", t);
+			}
+		}
+	}
+
+	@Override
+	public void cancel() {
+		// signal the main thread to exit
+		this.running = false;
+
+		// make sure the main thread wakes up soon
+		this.unassignedPartitionsQueue.addIfOpen(MARKER);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Kafka 0.8 specific class instantiation
+	// ------------------------------------------------------------------------
+
+	@Override
+	public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
+		return new TopicAndPartition(partition.getTopic(), partition.getPartition());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Offset handling
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
+		if (zkHandler != null) {
+			// the ZK handler takes care of incrementing the offsets by 1 before committing
+			zkHandler.prepareAndCommitOffsets(offsets);
+		}
+
+		// Set committed offsets in topic partition state
+		KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitions();
+		for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
+			Long offset = offsets.get(partition.getKafkaTopicPartition());
+			if (offset != null) {
+				partition.setCommittedOffset(offset);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(
+			List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
+			Node leader,
+			ExceptionProxy errorHandler) throws IOException, ClassNotFoundException
+	{
+		// each thread needs its own copy of the deserializer, because the deserializer is
+		// not necessarily thread safe
+		final KeyedDeserializationSchema<T> clonedDeserializer =
+				InstantiationUtil.clone(deserializer, runtimeContext.getUserCodeClassLoader());
+
+		// seed thread with list of fetch partitions (otherwise it would shut down immediately again
+		SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>(
+				this, errorHandler, kafkaConfig, leader, seedPartitions, unassignedPartitionsQueue, 
+				clonedDeserializer, invalidOffsetBehavior);
+
+		brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
+				runtimeContext.getTaskName(), leader.id(), leader.host(), leader.port()));
+		brokerThread.setDaemon(true);
+		brokerThread.start();
+
+		LOG.info("Starting thread {}", brokerThread.getName());
+		return brokerThread;
+	}
+
+	/**
+	 * Returns a list of unique topics from for the given partitions
+	 *
+	 * @param partitions A the partitions
+	 * @return A list of unique topics
+	 */
+	private static List<String> getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) {
+		HashSet<String> uniqueTopics = new HashSet<>();
+		for (KafkaTopicPartitionState<TopicAndPartition> fp: partitions) {
+			uniqueTopics.add(fp.getTopic());
+		}
+		return new ArrayList<>(uniqueTopics);
+	}
+
+	/**
+	 * Find leaders for the partitions
+	 *
+	 * From a high level, the method does the following:
+	 *	 - Get a list of FetchPartitions (usually only a few partitions)
+	 *	 - Get the list of topics from the FetchPartitions list and request the partitions for the topics. (Kafka doesn't support getting leaders for a set of partitions)
+	 *	 - Build a Map<Leader, List<FetchPartition>> where only the requested partitions are contained.
+	 *
+	 * @param partitionsToAssign fetch partitions list
+	 * @return leader to partitions map
+	 */
+	private static Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions(
+			List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign,
+			Properties kafkaProperties) throws Exception
+	{
+		if (partitionsToAssign.isEmpty()) {
+			throw new IllegalArgumentException("Leader request for empty partitions list");
+		}
+
+		LOG.info("Refreshing leader information for partitions {}", partitionsToAssign);
+		
+		// this request is based on the topic names
+		PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(getTopics(partitionsToAssign), kafkaProperties);
+		infoFetcher.start();
+
+		// NOTE: The kafka client apparently locks itself up sometimes
+		// when it is interrupted, so we run it only in a separate thread.
+		// since it sometimes refuses to shut down, we resort to the admittedly harsh
+		// means of killing the thread after a timeout.
+		KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
+		watchDog.start();
+
+		// this list contains ALL partitions of the requested topics
+		List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions();
+
+		// copy list to track unassigned partitions
+		List<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions = new ArrayList<>(partitionsToAssign);
+
+		// final mapping from leader -> list(fetchPartition)
+		Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> leaderToPartitions = new HashMap<>();
+
+		for(KafkaTopicPartitionLeader partitionLeader: topicPartitionWithLeaderList) {
+			if (unassignedPartitions.size() == 0) {
+				// we are done: all partitions are assigned
+				break;
+			}
+
+			Iterator<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsIterator = unassignedPartitions.iterator();
+			while (unassignedPartitionsIterator.hasNext()) {
+				KafkaTopicPartitionState<TopicAndPartition> unassignedPartition = unassignedPartitionsIterator.next();
+
+				if (unassignedPartition.getKafkaTopicPartition().equals(partitionLeader.getTopicPartition())) {
+					// we found the leader for one of the fetch partitions
+					Node leader = partitionLeader.getLeader();
+
+					List<KafkaTopicPartitionState<TopicAndPartition>> partitionsOfLeader = leaderToPartitions.get(leader);
+					if (partitionsOfLeader == null) {
+						partitionsOfLeader = new ArrayList<>();
+						leaderToPartitions.put(leader, partitionsOfLeader);
+					}
+					partitionsOfLeader.add(unassignedPartition);
+					unassignedPartitionsIterator.remove(); // partition has been assigned
+					break;
+				}
+			}
+		}
+
+		if (unassignedPartitions.size() > 0) {
+			throw new RuntimeException("Unable to find a leader for partitions: " + unassignedPartitions);
+		}
+
+		LOG.debug("Partitions with assigned leaders {}", leaderToPartitions);
+
+		return leaderToPartitions;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
new file mode 100644
index 0000000..4d61e53
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * A watch dog thread that forcibly kills another thread, if that thread does not
+ * finish in time.
+ * 
+ * <p>This uses the discouraged {@link Thread#stop()} method. While this is not
+ * advisable, this watch dog is only for extreme cases of thread that simply
+ * to not terminate otherwise.
+ */
+class KillerWatchDog extends Thread {
+
+	private final Thread toKill;
+	private final long timeout;
+
+	KillerWatchDog(Thread toKill, long timeout) {
+		super("KillerWatchDog");
+		setDaemon(true);
+
+		this.toKill = toKill;
+		this.timeout = timeout;
+	}
+
+	@SuppressWarnings("deprecation")
+	@Override
+	public void run() {
+		final long deadline = System.currentTimeMillis() + timeout;
+		long now;
+
+		while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
+			try {
+				toKill.join(deadline - now);
+			}
+			catch (InterruptedException e) {
+				// ignore here, our job is important!
+			}
+		}
+
+		// this is harsh, but this watchdog is a last resort
+		if (toKill.isAlive()) {
+			toKill.stop();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
new file mode 100644
index 0000000..d8d927d
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
+
+import java.util.List;
+import java.util.Properties;
+
+class PartitionInfoFetcher extends Thread {
+
+	private final List<String> topics;
+	private final Properties properties;
+
+	private volatile List<KafkaTopicPartitionLeader> result;
+	private volatile Throwable error;
+
+
+	PartitionInfoFetcher(List<String> topics, Properties properties) {
+		this.topics = topics;
+		this.properties = properties;
+	}
+
+	@Override
+	public void run() {
+		try {
+			result = FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties);
+		}
+		catch (Throwable t) {
+			this.error = t;
+		}
+	}
+
+	public List<KafkaTopicPartitionLeader> getPartitions() throws Exception {
+		try {
+			this.join();
+		}
+		catch (InterruptedException e) {
+			throw new Exception("Partition fetching was cancelled before completion");
+		}
+
+		if (error != null) {
+			throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error);
+		}
+		if (result != null) {
+			return result;
+		}
+		throw new Exception("Partition fetching failed");
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
new file mode 100644
index 0000000..27d90f2
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.HashMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A thread that periodically writes the current Kafka partition offsets to Zookeeper.
+ */
+public class PeriodicOffsetCommitter extends Thread {
+
+	/** The ZooKeeper handler */
+	private final ZookeeperOffsetHandler offsetHandler;
+	
+	private final KafkaTopicPartitionState<?>[] partitionStates;
+	
+	/** The proxy to forward exceptions to the main thread */
+	private final ExceptionProxy errorHandler;
+	
+	/** Interval in which to commit, in milliseconds */
+	private final long commitInterval;
+	
+	/** Flag to mark the periodic committer as running */
+	private volatile boolean running = true;
+
+	PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler,
+			KafkaTopicPartitionState<?>[] partitionStates,
+			ExceptionProxy errorHandler,
+			long commitInterval)
+	{
+		this.offsetHandler = checkNotNull(offsetHandler);
+		this.partitionStates = checkNotNull(partitionStates);
+		this.errorHandler = checkNotNull(errorHandler);
+		this.commitInterval = commitInterval;
+		
+		checkArgument(commitInterval > 0);
+	}
+
+	@Override
+	public void run() {
+		try {
+			while (running) {
+				Thread.sleep(commitInterval);
+
+				// create copy a deep copy of the current offsets
+				HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length);
+				for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
+					offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
+				}
+				
+				offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
+			}
+		}
+		catch (Throwable t) {
+			if (running) {
+				errorHandler.reportError(
+						new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
+			}
+		}
+	}
+
+	public void shutdown() {
+		this.running = false;
+		this.interrupt();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
new file mode 100644
index 0000000..35e491a
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.kafka.common.Node;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.PropertiesUtil.getInt;
+
+/**
+ * This class implements a thread with a connection to a single Kafka broker. The thread
+ * pulls records for a set of topic partitions for which the connected broker is currently
+ * the leader. The thread deserializes these records and emits them. 
+ * 
+ * @param <T> The type of elements that this consumer thread creates from Kafka's byte messages
+ *            and emits into the Flink DataStream.
+ */
+class SimpleConsumerThread<T> extends Thread {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumerThread.class);
+
+	private static final KafkaTopicPartitionState<TopicAndPartition> MARKER = Kafka08Fetcher.MARKER;
+	
+	// ------------------------------------------------------------------------
+
+	private final Kafka08Fetcher<T> owner;
+	
+	private final KeyedDeserializationSchema<T> deserializer;
+
+	private final List<KafkaTopicPartitionState<TopicAndPartition>> partitions;
+
+	private final Node broker;
+
+	/** Queue containing new fetch partitions for the consumer thread */
+	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue;
+	
+	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions;
+	
+	private final ExceptionProxy errorHandler;
+	
+	private final long invalidOffsetBehavior;
+	
+	private volatile boolean running = true;
+	
+
+	// ----------------- Simple Consumer ----------------------
+	private volatile SimpleConsumer consumer;
+
+	private final int soTimeout;
+	private final int minBytes;
+	private final int maxWait;
+	private final int fetchSize;
+	private final int bufferSize;
+	private final int reconnectLimit;
+
+
+	// exceptions are thrown locally
+	public SimpleConsumerThread(
+			Kafka08Fetcher<T> owner,
+			ExceptionProxy errorHandler,
+			Properties config,
+			Node broker,
+			List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
+			ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions,
+			KeyedDeserializationSchema<T> deserializer,
+			long invalidOffsetBehavior)
+	{
+		this.owner = owner;
+		this.errorHandler = errorHandler;
+		this.broker = broker;
+		this.partitions = seedPartitions;
+		this.deserializer = requireNonNull(deserializer);
+		this.unassignedPartitions = requireNonNull(unassignedPartitions);
+		this.newPartitionsQueue = new ClosableBlockingQueue<>();
+		this.invalidOffsetBehavior = invalidOffsetBehavior;
+		
+		// these are the actual configuration values of Kafka + their original default values.
+		this.soTimeout = getInt(config, "socket.timeout.ms", 30000);
+		this.minBytes = getInt(config, "fetch.min.bytes", 1);
+		this.maxWait = getInt(config, "fetch.wait.max.ms", 100);
+		this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576);
+		this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536);
+		this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3);
+	}
+
+	public ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> getNewPartitionsQueue() {
+		return newPartitionsQueue;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  main work loop
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void run() {
+		LOG.info("Starting to fetch from {}", this.partitions);
+
+		// set up the config values
+		final String clientId = "flink-kafka-consumer-legacy-" + broker.id();
+
+		try {
+			// create the Kafka consumer that we actually use for fetching
+			consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
+			
+			// make sure that all partitions have some offsets to start with
+			// those partitions that do not have an offset from a checkpoint need to get
+			// their start offset from ZooKeeper
+			getMissingOffsetsFromKafka(partitions);
+
+			// Now, the actual work starts :-)
+			int offsetOutOfRangeCount = 0;
+			int reconnects = 0;
+			while (running) {
+
+				// ----------------------------------- partitions list maintenance ----------------------------
+
+				// check queue for new partitions to read from:
+				List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = newPartitionsQueue.pollBatch();
+				if (newPartitions != null) {
+					// found some new partitions for this thread's broker
+					
+					// check if the new partitions need an offset lookup
+					getMissingOffsetsFromKafka(newPartitions);
+					
+					// add the new partitions (and check they are not already in there)
+					for (KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) {
+						if (partitions.contains(newPartition)) {
+							throw new IllegalStateException("Adding partition " + newPartition + 
+									" to subscribed partitions even though it is already subscribed");
+						}
+						partitions.add(newPartition);
+					}
+					
+					LOG.info("Adding {} new partitions to consumer thread {}", newPartitions.size(), getName());
+					LOG.debug("Partitions list: {}", newPartitions);
+				}
+
+				if (partitions.size() == 0) {
+					if (newPartitionsQueue.close()) {
+						// close succeeded. Closing thread
+						running = false;
+						
+						LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.", 
+								getName());
+
+						// add the wake-up marker into the queue to make the main thread
+						// immediately wake up and termination faster
+						unassignedPartitions.add(MARKER);
+
+						break;
+					} else {
+						// close failed: fetcher main thread concurrently added new partitions into the queue.
+						// go to top of loop again and get the new partitions
+						continue; 
+					}
+				}
+
+				// ----------------------------------- request / response with kafka ----------------------------
+
+				FetchRequestBuilder frb = new FetchRequestBuilder();
+				frb.clientId(clientId);
+				frb.maxWait(maxWait);
+				frb.minBytes(minBytes);
+
+				for (KafkaTopicPartitionState<?> partition : partitions) {
+					frb.addFetch(
+							partition.getKafkaTopicPartition().getTopic(),
+							partition.getKafkaTopicPartition().getPartition(),
+							partition.getOffset() + 1, // request the next record
+							fetchSize);
+				}
+				
+				kafka.api.FetchRequest fetchRequest = frb.build();
+				LOG.debug("Issuing fetch request {}", fetchRequest);
+
+				FetchResponse fetchResponse;
+				try {
+					fetchResponse = consumer.fetch(fetchRequest);
+				}
+				catch (Throwable cce) {
+					//noinspection ConstantConditions
+					if (cce instanceof ClosedChannelException) {
+						LOG.warn("Fetch failed because of ClosedChannelException.");
+						LOG.debug("Full exception", cce);
+						
+						// we don't know if the broker is overloaded or unavailable.
+						// retry a few times, then return ALL partitions for new leader lookup
+						if (++reconnects >= reconnectLimit) {
+							LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", reconnectLimit);
+							for (KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) {
+								unassignedPartitions.add(fp);
+							}
+							this.partitions.clear();
+							continue; // jump to top of loop: will close thread or subscribe to new partitions
+						}
+						try {
+							consumer.close();
+						} catch (Throwable t) {
+							LOG.warn("Error while closing consumer connection", t);
+						}
+						// delay & retry
+						Thread.sleep(100);
+						consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
+						continue; // retry
+					} else {
+						throw cce;
+					}
+				}
+				reconnects = 0;
+
+				// ---------------------------------------- error handling ----------------------------
+
+				if (fetchResponse == null) {
+					throw new IOException("Fetch from Kafka failed (request returned null)");
+				}
+				
+				if (fetchResponse.hasError()) {
+					String exception = "";
+					List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
+					
+					// iterate over partitions to get individual error codes
+					Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
+					boolean partitionsRemoved = false;
+					
+					while (partitionsIterator.hasNext()) {
+						final KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next();
+						short code = fetchResponse.errorCode(fp.getTopic(), fp.getPartition());
+
+						if (code == ErrorMapping.OffsetOutOfRangeCode()) {
+							// we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
+							// Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
+							partitionsToGetOffsetsFor.add(fp);
+						}
+						else if (code == ErrorMapping.NotLeaderForPartitionCode() ||
+								code == ErrorMapping.LeaderNotAvailableCode() ||
+								code == ErrorMapping.BrokerNotAvailableCode() ||
+								code == ErrorMapping.UnknownCode())
+						{
+							// the broker we are connected to is not the leader for the partition.
+							LOG.warn("{} is not the leader of {}. Reassigning leader for partition", broker, fp);
+							LOG.debug("Error code = {}", code);
+
+							unassignedPartitions.add(fp);
+
+							partitionsIterator.remove(); // unsubscribe the partition ourselves
+							partitionsRemoved = true;
+						}
+						else if (code != ErrorMapping.NoError()) {
+							exception += "\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " +
+									StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+						}
+					}
+					if (partitionsToGetOffsetsFor.size() > 0) {
+						// safeguard against an infinite loop.
+						if (offsetOutOfRangeCount++ > 3) {
+							throw new RuntimeException("Found invalid offsets more than three times in partitions "
+									+ partitionsToGetOffsetsFor + " Exceptions: " + exception);
+						}
+						// get valid offsets for these partitions and try again.
+						LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
+						getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
+						
+						LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
+						continue; // jump back to create a new fetch request. The offset has not been touched.
+					}
+					else if (partitionsRemoved) {
+						continue; // create new fetch request
+					}
+					else {
+						// partitions failed on an error
+						throw new IOException("Error while fetching from broker '" + broker +"': " + exception);
+					}
+				} else {
+					// successful fetch, reset offsetOutOfRangeCount.
+					offsetOutOfRangeCount = 0;
+				}
+
+				// ----------------------------------- process fetch response ----------------------------
+
+				int messagesInFetch = 0;
+				int deletedMessages = 0;
+				Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
+				
+				partitionsLoop:
+				while (partitionsIterator.hasNext()) {
+					final KafkaTopicPartitionState<TopicAndPartition> currentPartition = partitionsIterator.next();
+					
+					final ByteBufferMessageSet messageSet = fetchResponse.messageSet(
+							currentPartition.getTopic(), currentPartition.getPartition());
+
+					for (MessageAndOffset msg : messageSet) {
+						if (running) {
+							messagesInFetch++;
+							final ByteBuffer payload = msg.message().payload();
+							final long offset = msg.offset();
+							
+							if (offset <= currentPartition.getOffset()) {
+								// we have seen this message already
+								LOG.info("Skipping message with offset " + msg.offset()
+										+ " because we have seen messages until (including) "
+										+ currentPartition.getOffset()
+										+ " from topic/partition " + currentPartition.getTopic() + '/'
+										+ currentPartition.getPartition() + " already");
+								continue;
+							}
+
+							// If the message value is null, this represents a delete command for the message key.
+							// Log this and pass it on to the client who might want to also receive delete messages.
+							byte[] valueBytes;
+							if (payload == null) {
+								deletedMessages++;
+								valueBytes = null;
+							} else {
+								valueBytes = new byte[payload.remaining()];
+								payload.get(valueBytes);
+							}
+
+							// put key into byte array
+							byte[] keyBytes = null;
+							int keySize = msg.message().keySize();
+
+							if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization
+								ByteBuffer keyPayload = msg.message().key();
+								keyBytes = new byte[keySize];
+								keyPayload.get(keyBytes);
+							}
+
+							final T value = deserializer.deserialize(keyBytes, valueBytes, 
+									currentPartition.getTopic(), currentPartition.getPartition(), offset);
+							
+							if (deserializer.isEndOfStream(value)) {
+								// remove partition from subscribed partitions.
+								partitionsIterator.remove();
+								continue partitionsLoop;
+							}
+							
+							owner.emitRecord(value, currentPartition, offset);
+						}
+						else {
+							// no longer running
+							return;
+						}
+					}
+				}
+				LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages);
+			} // end of fetch loop
+
+			if (!newPartitionsQueue.close()) {
+				throw new Exception("Bug: Cleanly leaving fetcher thread without having a closed queue.");
+			}
+		}
+		catch (Throwable t) {
+			// report to the fetcher's error handler
+			errorHandler.reportError(t);
+		}
+		finally {
+			if (consumer != null) {
+				// closing the consumer should not fail the program
+				try {
+					consumer.close();
+				}
+				catch (Throwable t) {
+					LOG.error("Error while closing the Kafka simple consumer", t);
+				}
+			}
+		}
+	}
+
+	private void getMissingOffsetsFromKafka(
+			List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException
+	{
+		// collect which partitions we should fetch offsets for
+		List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
+		for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+			if (!part.isOffsetDefined()) {
+				// retrieve the offset from the consumer
+				partitionsToGetOffsetsFor.add(part);
+			}
+		}
+		
+		if (partitionsToGetOffsetsFor.size() > 0) {
+			getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
+			
+			LOG.info("No checkpoint/savepoint offsets found for some partitions. " +
+					"Fetched the following start offsets {}", partitionsToGetOffsetsFor);
+		}
+	}
+
+	/**
+	 * Cancels this fetch thread. The thread will release all resources and terminate.
+	 */
+	public void cancel() {
+		this.running = false;
+
+		// interrupt whatever the consumer is doing
+		if (consumer != null) {
+			consumer.close();
+		}
+
+		this.interrupt();
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Kafka Request Utils
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Request latest offsets for a set of partitions, via a Kafka consumer.
+	 *
+	 * <p>This method retries three times if the response has an error.
+	 *
+	 * @param consumer The consumer connected to lead broker
+	 * @param partitions The list of partitions we need offsets for
+	 * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
+	 */
+	private static void getLastOffsetFromKafka(
+			SimpleConsumer consumer,
+			List<KafkaTopicPartitionState<TopicAndPartition>> partitions,
+			long whichTime) throws IOException
+	{
+		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
+		for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+			requestInfo.put(part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(whichTime, 1));
+		}
+
+		int retries = 0;
+		OffsetResponse response;
+		while (true) {
+			kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+					requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+			response = consumer.getOffsetsBefore(request);
+
+			if (response.hasError()) {
+				StringBuilder exception = new StringBuilder();
+				for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+					short code;
+					if ((code = response.errorCode(part.getTopic(), part.getPartition())) != ErrorMapping.NoError()) {
+						exception.append("\nException for topic=").append(part.getTopic())
+								.append(" partition=").append(part.getPartition()).append(": ")
+								.append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code)));
+					}
+				}
+				if (++retries >= 3) {
+					throw new IOException("Unable to get last offset for partitions " + partitions + ": "
+							+ exception.toString());
+				} else {
+					LOG.warn("Unable to get last offset for partitions: Exception(s): {}", exception);
+				}
+			} else {
+				break; // leave retry loop
+			}
+		}
+
+		for (KafkaTopicPartitionState<TopicAndPartition> part: partitions) {
+			final long offset = response.offsets(part.getTopic(), part.getPartition())[0];
+			
+			// the offset returned is that of the next record to fetch. because our state reflects the latest
+			// successfully emitted record, we subtract one
+			part.setOffset(offset - 1);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
new file mode 100644
index 0000000..8f2ef09
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.utils.ZKGroupTopicDirs;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Handler for committing Kafka offsets to Zookeeper and to retrieve them again.
+ */
+public class ZookeeperOffsetHandler {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
+
+	private final String groupId;
+
+	private final CuratorFramework curatorClient;
+
+
+	public ZookeeperOffsetHandler(Properties props) {
+		this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+		if (this.groupId == null) {
+			throw new IllegalArgumentException("Required property '"
+					+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
+		}
+		
+		String zkConnect = props.getProperty("zookeeper.connect");
+		if (zkConnect == null) {
+			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
+		}
+
+		// we use Curator's default timeouts
+		int sessionTimeoutMs =  Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
+		int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
+		
+		// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
+		int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
+		int backoffMaxRetries =  Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
+		
+		RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
+		curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
+		curatorClient.start();
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Offset access and manipulation
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
+	 * the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
+	 * that the committed offsets to Zookeeper represent the next record to process.
+	 * 
+	 * @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
+	 * @throws Exception The method forwards exceptions.
+	 */
+	public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
+		for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
+			KafkaTopicPartition tp = entry.getKey();
+
+			Long lastProcessedOffset = entry.getValue();
+			if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
+				setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
+			}
+		}
+	}
+
+	/**
+	 * @param partitions The partitions to read offsets for.
+	 * @return The mapping from partition to offset.
+	 * @throws Exception This method forwards exceptions.
+	 */
+	public Map<KafkaTopicPartition, Long> getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception {
+		Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size());
+		for (KafkaTopicPartition tp : partitions) {
+			Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition());
+
+			if (offset != null) {
+				LOG.info("Offset for TopicPartition {}:{} was set to {} in ZooKeeper. Seeking fetcher to that position.",
+						tp.getTopic(), tp.getPartition(), offset);
+				ret.put(tp, offset);
+			}
+		}
+		return ret;
+	}
+
+	/**
+	 * Closes the offset handler.
+	 * 
+	 * @throws IOException Thrown, if the handler cannot be closed properly.
+	 */
+	public void close() throws IOException {
+		curatorClient.close();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Communication with Zookeeper
+	// ------------------------------------------------------------------------
+	
+	public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
+		String path = topicDirs.consumerOffsetDir() + "/" + partition;
+		curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
+		byte[] data = Long.toString(offset).getBytes();
+		curatorClient.setData().forPath(path, data);
+	}
+
+	public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
+		String path = topicDirs.consumerOffsetDir() + "/" + partition;
+		curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
+		
+		byte[] data = curatorClient.getData().forPath(path);
+		
+		if (data == null) {
+			return null;
+		} else {
+			String asString = new String(data);
+			if (asString.length() == 0) {
+				return null;
+			} else {
+				try {
+					return Long.valueOf(asString);
+				}
+				catch (NumberFormatException e) {
+					LOG.error(
+							"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
+						groupId, topic, partition, asString);
+					return null;
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
new file mode 100644
index 0000000..fabb0fe
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class Kafka08ITCase extends KafkaConsumerTestBase {
+
+	// ------------------------------------------------------------------------
+	//  Suite of Tests
+	// ------------------------------------------------------------------------
+
+	@Test(timeout = 60000)
+	public void testFailOnNoBroker() throws Exception {
+		runFailOnNoBrokerTest();
+	}
+
+
+	@Test(timeout = 60000)
+	public void testConcurrentProducerConsumerTopology() throws Exception {
+		runSimpleConcurrentProducerConsumerTopology();
+	}
+
+//	@Test(timeout = 60000)
+//	public void testPunctuatedExplicitWMConsumer() throws Exception {
+//		runExplicitPunctuatedWMgeneratingConsumerTest(false);
+//	}
+
+//	@Test(timeout = 60000)
+//	public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
+//		runExplicitPunctuatedWMgeneratingConsumerTest(true);
+//	}
+
+	@Test(timeout = 60000)
+	public void testKeyValueSupport() throws Exception {
+		runKeyValueTest();
+	}
+
+	// --- canceling / failures ---
+
+	@Test(timeout = 60000)
+	public void testCancelingEmptyTopic() throws Exception {
+		runCancelingOnEmptyInputTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testCancelingFullTopic() throws Exception {
+		runCancelingOnFullInputTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testFailOnDeploy() throws Exception {
+		runFailOnDeployTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testInvalidOffset() throws Exception {
+		
+		final int parallelism = 1;
+		
+		// write 20 messages into topic:
+		final String topic = writeSequence("invalidOffsetTopic", 20, parallelism, 1);
+
+		// set invalid offset:
+		CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
+		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topic, 0, 1234);
+		curatorClient.close();
+
+		// read from topic
+		final int valuesCount = 20;
+		final int startFrom = 0;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.getConfig().disableSysoutLogging();
+		
+		readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom);
+
+		deleteTestTopic(topic);
+	}
+
+	// --- source to partition mappings and exactly once ---
+
+	@Test(timeout = 60000)
+	public void testOneToOneSources() throws Exception {
+		runOneToOneExactlyOnceTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testOneSourceMultiplePartitions() throws Exception {
+		runOneSourceMultiplePartitionsExactlyOnceTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMultipleSourcesOnePartition() throws Exception {
+		runMultipleSourcesOnePartitionExactlyOnceTest();
+	}
+
+	// --- broker failure ---
+
+	@Test(timeout = 60000)
+	public void testBrokerFailure() throws Exception {
+		runBrokerFailureTest();
+	}
+
+	// --- offset committing ---
+
+	@Test(timeout = 60000)
+	public void testCommitOffsetsToZookeeper() throws Exception {
+		runCommitOffsetsToKafka();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromZookeeperCommitOffsets() throws Exception {
+		runStartFromKafkaCommitOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testAutoOffsetRetrievalAndCommitToZookeeper() throws Exception {
+		runAutoOffsetRetrievalAndCommitToKafka();
+	}
+
+	@Test
+	public void runOffsetManipulationInZooKeeperTest() {
+		try {
+			final String topicName = "ZookeeperOffsetHandlerTest-Topic";
+			final String groupId = "ZookeeperOffsetHandlerTest-Group";
+
+			final Long offset = (long) (Math.random() * Long.MAX_VALUE);
+
+			CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient();
+			kafkaServer.createTestTopic(topicName, 3, 2);
+
+			ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset);
+
+			Long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0);
+
+			curatorFramework.close();
+
+			assertEquals(offset, fetchedOffset);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test(timeout = 60000)
+	public void testOffsetAutocommitTest() throws Exception {
+		final int parallelism = 3;
+
+		// write a sequence from 0 to 99 to each of the 3 partitions.
+		final String topicName = writeSequence("testOffsetAutocommit", 100, parallelism, 1);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		// NOTE: We are not enabling the checkpointing!
+		env.getConfig().disableSysoutLogging();
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.setParallelism(parallelism);
+
+		// the readSequence operation sleeps for 20 ms between each record.
+		// setting a delay of 25*20 = 500 for the commit interval makes
+		// sure that we commit roughly 3-4 times while reading, however
+		// at least once.
+		Properties readProps = new Properties();
+		readProps.putAll(standardProps);
+		readProps.setProperty("auto.commit.interval.ms", "500");
+
+		// read so that the offset can be committed to ZK
+		readSequence(env, readProps, parallelism, topicName, 100, 0);
+
+		// get the offset
+		CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
+
+		Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
+		Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
+		Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
+		curatorFramework.close();
+		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
+
+		// ensure that the offset has been committed
+		boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 100) ||
+			(o2 != null && o2 > 0 && o2 <= 100) ||
+			(o3 != null && o3 > 0 && o3 <= 100);
+		assertTrue("Expecting at least one offset to be set o1="+o1+" o2="+o2+" o3="+o3, atLeastOneOffsetSet);
+
+		deleteTestTopic(topicName);
+	}
+
+	// --- special executions ---
+
+	@Test(timeout = 60000)
+	public void testBigRecordJob() throws Exception {
+		runBigRecordTestTopology();
+	}
+
+	@Test(timeout = 60000)
+	public void testMultipleTopics() throws Exception {
+		runProduceConsumeMultipleTopics();
+	}
+
+	@Test(timeout = 60000)
+	public void testAllDeletes() throws Exception {
+		runAllDeletesTest();
+	}
+
+	@Test(timeout=60000)
+	public void testEndOfStream() throws Exception {
+		runEndOfStreamTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMetrics() throws Throwable {
+		runMetricsTest();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
new file mode 100644
index 0000000..6d0b140
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+	@Override
+	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+			final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+		return new Kafka08JsonTableSink(topic, properties, partitioner) {
+			@Override
+			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+					SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
+				return kafkaProducer;
+			}
+		};
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected SerializationSchema<Row> getSerializationSchema() {
+		return new JsonRowSerializationSchema(FIELD_NAMES);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
new file mode 100644
index 0000000..a2d66ac
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+	@Override
+	protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
+		return new Kafka08JsonTableSource(topic, properties, fieldNames, typeInfo);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+		return (Class) JsonRowDeserializationSchema.class;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer08.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
new file mode 100644
index 0000000..5c951db
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class Kafka08ProducerITCase extends KafkaProducerTestBase {
+
+	@Test
+	public void testCustomPartitioning() {
+		runCustomPartitioningTest();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
new file mode 100644
index 0000000..9520f55
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.Test;
+
+public class KafkaConsumer08Test {
+
+	@Test
+	public void testValidateZooKeeperConfig() {
+		try {
+			// empty
+			Properties emptyProperties = new Properties();
+			try {
+				FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties);
+				fail("should fail with an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+
+			// no connect string (only group string)
+			Properties noConnect = new Properties();
+			noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
+			try {
+				FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect);
+				fail("should fail with an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+
+			// no group string (only connect string)
+			Properties noGroup = new Properties();
+			noGroup.put("zookeeper.connect", "localhost:47574");
+			try {
+				FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup);
+				fail("should fail with an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCreateSourceWithoutCluster() {
+		try {
+			Properties props = new Properties();
+			props.setProperty("zookeeper.connect", "localhost:56794");
+			props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
+			props.setProperty("group.id", "non-existent-group");
+
+			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
+			consumer.open(new Configuration());
+			fail();
+		}
+		catch (Exception e) {
+			assertTrue(e.getMessage().contains("Unable to retrieve any partitions"));
+		}
+	}
+
+	@Test
+	public void testAllBoostrapServerHostsAreInvalid() {
+		try {
+			String zookeeperConnect = "localhost:56794";
+			String bootstrapServers = "indexistentHost:11111";
+			String groupId = "non-existent-group";
+			Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
+			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
+					new SimpleStringSchema(), props);
+			consumer.open(new Configuration());
+			fail();
+		} catch (Exception e) {
+			assertTrue("Exception should be thrown containing 'all bootstrap servers invalid' message!",
+					e.getMessage().contains("All the servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+							+ "' config are invalid"));
+		}
+	}
+
+	@Test
+	public void testAtLeastOneBootstrapServerHostIsValid() {
+		try {
+			String zookeeperConnect = "localhost:56794";
+			// we declare one valid boostrap server, namely the one with
+			// 'localhost'
+			String bootstrapServers = "indexistentHost:11111, localhost:22222";
+			String groupId = "non-existent-group";
+			Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
+			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
+					new SimpleStringSchema(), props);
+			consumer.open(new Configuration());
+			fail();
+		} catch (Exception e) {
+			// test is not failing because we have one valid boostrap server
+			assertTrue("The cause of the exception should not be 'all boostrap server are invalid'!",
+					!e.getMessage().contains("All the hosts provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+							+ " config are invalid"));
+		}
+	}
+	
+	private Properties createKafkaProps(String zookeeperConnect, String bootstrapServers, String groupId) {
+		Properties props = new Properties();
+		props.setProperty("zookeeper.connect", zookeeperConnect);
+		props.setProperty("bootstrap.servers", bootstrapServers);
+		props.setProperty("group.id", groupId);
+		return props;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
new file mode 100644
index 0000000..72d2772
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaLocalSystemTime implements Time {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
+
+	@Override
+	public long milliseconds() {
+		return System.currentTimeMillis();
+	}
+
+	@Override
+	public long nanoseconds() {
+		return System.nanoTime();
+	}
+
+	@Override
+	public void sleep(long ms) {
+		try {
+			Thread.sleep(ms);
+		} catch (InterruptedException e) {
+			LOG.warn("Interruption", e);
+		}
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..91fc286
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.concurrent.Future;
+
+
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducerBase.class)
+public class KafkaProducerTest extends TestLogger {
+	
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testPropagateExceptions() {
+		try {
+			// mock kafka producer
+			KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
+			
+			// partition setup
+			when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+				// returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour
+				Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
+
+			// failure when trying to send an element
+			when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
+				.thenAnswer(new Answer<Future<RecordMetadata>>() {
+					@Override
+					public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
+						Callback callback = (Callback) invocation.getArguments()[1];
+						callback.onCompletion(null, new Exception("Test error"));
+						return null;
+					}
+				});
+			
+			// make sure the FlinkKafkaProducer instantiates our mock producer
+			whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+			
+			// (1) producer that propagates errors
+
+			FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>(
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+
+			OneInputStreamOperatorTestHarness<String, Object> testHarness =
+					new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
+
+			testHarness.open();
+
+			try {
+				testHarness.processElement(new StreamRecord<>("value"));
+				testHarness.processElement(new StreamRecord<>("value"));
+				fail("This should fail with an exception");
+			}
+			catch (Exception e) {
+				assertNotNull(e.getCause());
+				assertNotNull(e.getCause().getMessage());
+				assertTrue(e.getCause().getMessage().contains("Test error"));
+			}
+
+			testHarness.close();
+
+			// (2) producer that only logs errors
+
+			FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>(
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+			producerLogging.setLogFailuresOnly(true);
+
+			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
+
+			testHarness.open();
+
+			testHarness.processElement(new StreamRecord<>("value"));
+			testHarness.processElement(new StreamRecord<>("value"));
+
+			testHarness.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
new file mode 100644
index 0000000..c28799c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase {
+
+	@Test(timeout=60000)
+	public void testAutoOffsetReset() throws Exception {
+		runAutoOffsetResetTest();
+	}
+
+	@Test(timeout=60000)
+	public void testAutoOffsetResetNone() throws Exception {
+		runFailOnAutoOffsetResetNoneEager();
+	}
+}