You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:45 UTC

[28/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Refactor, cleanup, and fix kafka consumers

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
index 1f244c1..9638b84 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.internals;
 
-import com.google.common.base.Preconditions;
 import kafka.api.FetchRequestBuilder;
 import kafka.api.OffsetRequest;
 import kafka.api.PartitionOffsetRequestInfo;
@@ -27,345 +27,576 @@ import kafka.javaapi.OffsetResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.MessageAndOffset;
-import org.apache.flink.api.java.tuple.Tuple2;
+
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.FlinkKafkaConsumer;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.kafka_backport.common.Node;
 import org.apache.flink.kafka_backport.common.PartitionInfo;
 import org.apache.flink.kafka_backport.common.TopicPartition;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
+/**
+ * This fetcher uses Kafka's low-level API to pull data from a specific
+ * set of partitions and offsets for a certain topic.
+ * 
+ * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p>
+ */
 public class LegacyFetcher implements Fetcher {
-	public static Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+	
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
 
+	/** The topic from which this fetcher pulls data */
 	private final String topic;
+	
+	/** The properties that configure the Kafka connection */
+	private final Properties config;
+	
+	/** The task name, to give more readable names to the spawned threads */
+	private final String taskName;
+	
+	/** The first error that occurred in a connection thread */
+	private final AtomicReference<Throwable> error;
+
+	/** The partitions that the fetcher should read, with their starting offsets */
 	private Map<TopicPartition, Long> partitionsToRead;
-	private boolean running = true;
-	private Properties config;
-
-	public final static String QUEUE_SIZE_KEY = "flink.kafka.consumer.queue.size";
-	public final static String DEFAULT_QUEUE_SIZE = "10000";
+	
+	/** Reference the the thread that executed the run() method. */
+	private volatile Thread mainThread;
+	
+	/** Flag to shot the fetcher down */
+	private volatile boolean running = true;
 
 
 	public LegacyFetcher(String topic, Properties props) {
-		config = props;
-		this.topic = topic;
+		this(topic, props, "");
+	}
+	
+	public LegacyFetcher(String topic, Properties props, String taskName) {
+		this.config = checkNotNull(props, "The config properties cannot be null");
+		this.topic = checkNotNull(topic, "The topic cannot be null");
+		this.taskName = taskName;
+		this.error = new AtomicReference<>();
 	}
 
+	// ------------------------------------------------------------------------
+	//  Fetcher methods
+	// ------------------------------------------------------------------------
+	
 	@Override
-	public void partitionsToRead(List<TopicPartition> partitions) {
+	public void setPartitionsToRead(List<TopicPartition> partitions) {
 		partitionsToRead = new HashMap<TopicPartition, Long>(partitions.size());
-		for(TopicPartition tp: partitions) {
-			partitionsToRead.put(tp, FlinkKafkaConsumerBase.OFFSET_NOT_SET);
+		for (TopicPartition tp: partitions) {
+			partitionsToRead.put(tp, FlinkKafkaConsumer.OFFSET_NOT_SET);
 		}
 	}
 
 	@Override
 	public void seek(TopicPartition topicPartition, long offsetToRead) {
-		if(partitionsToRead == null) {
+		if (partitionsToRead == null) {
 			throw new IllegalArgumentException("No partitions to read set");
 		}
-		if(!partitionsToRead.containsKey(topicPartition)) {
-			throw new IllegalArgumentException("Can not set offset on a partition ("+topicPartition+") we are not going to read. " +
-					"Partitions to read "+partitionsToRead);
+		if (!partitionsToRead.containsKey(topicPartition)) {
+			throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition
+					+ ") we are not going to read. Partitions to read " + partitionsToRead);
 		}
 		partitionsToRead.put(topicPartition, offsetToRead);
 	}
-
+	
 	@Override
 	public void close() {
-		running = false;
+		// flag needs to be check by the run() method that creates the spawned threads
+		this.running = false;
+		
+		// all other cleanup is made by the run method itself
 	}
 
 	@Override
-	public <T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, long[] lastOffsets) {
-		if(partitionsToRead == null || partitionsToRead.size() == 0) {
+	public <T> void run(SourceFunction.SourceContext<T> sourceContext, 
+						DeserializationSchema<T> valueDeserializer,
+						long[] lastOffsets) throws Exception {
+		
+		if (partitionsToRead == null || partitionsToRead.size() == 0) {
 			throw new IllegalArgumentException("No partitions set");
 		}
+		
+		// NOTE: This method is needs to always release all resources it acquires
+		
+		this.mainThread = Thread.currentThread();
 
-		LOG.info("Reading from partitions "+partitionsToRead+" using the legacy fetcher");
+		LOG.info("Reading from partitions " + partitionsToRead + " using the legacy fetcher");
+		
 		// get lead broker for each partition
-		List<PartitionInfo> allPartitionsInTopic = FlinkKafkaConsumerBase.getPartitionsForTopic(topic, config);
-
+		
+		// NOTE: The kafka client apparently locks itself in an infinite loop sometimes
+		// when it is interrupted, so we run it only in a separate thread.
+		// since it sometimes refuses to shut down, we resort to the admittedly harsh
+		// means of killing the thread after a timeout.
+		PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(topic, config);
+		infoFetcher.start();
+		
+		KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
+		watchDog.start();
+		
+		final List<PartitionInfo> allPartitionsInTopic = infoFetcher.getPartitions();
+		
 		// brokers to fetch partitions from.
 		int fetchPartitionsCount = 0;
 		Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<Node, List<FetchPartition>>();
-		for(PartitionInfo partitionInfo : allPartitionsInTopic) {
-			if(partitionInfo.leader() == null) {
-				throw new RuntimeException("Unable to consume partition "+partitionInfo.partition()+" from topic "+partitionInfo.topic()+" because it does not have a leader");
+		
+		for (PartitionInfo partitionInfo : allPartitionsInTopic) {
+			if (partitionInfo.leader() == null) {
+				throw new RuntimeException("Unable to consume partition " + partitionInfo.partition()
+						+ " from topic "+partitionInfo.topic()+" because it does not have a leader");
 			}
-			for(Map.Entry<TopicPartition, Long> partitionToRead: partitionsToRead.entrySet()) {
-				if(partitionToRead.getKey().partition() == partitionInfo.partition()) {
+			
+			for (Map.Entry<TopicPartition, Long> entry : partitionsToRead.entrySet()) {
+				final TopicPartition topicPartition = entry.getKey();
+				final long offset = entry.getValue();
+				
+				// check if that partition is for us
+				if (topicPartition.partition() == partitionInfo.partition()) {
 					List<FetchPartition> partitions = fetchBrokers.get(partitionInfo.leader());
-					if(partitions == null) {
+					if (partitions == null) {
 						partitions = new ArrayList<FetchPartition>();
+						fetchBrokers.put(partitionInfo.leader(), partitions);
 					}
-					FetchPartition fp = new FetchPartition();
-					fp.nextOffsetToRead = partitionToRead.getValue();
-					fp.partition = partitionToRead.getKey().partition();
-					partitions.add(fp);
+					
+					partitions.add(new FetchPartition(topicPartition.partition(), offset));
 					fetchPartitionsCount++;
-					fetchBrokers.put(partitionInfo.leader(), partitions);
+					
 				}
+				// else this partition is not for us
 			}
 		}
-		if(partitionsToRead.size() != fetchPartitionsCount) {
-			throw new RuntimeException(partitionsToRead.size() + " partitions to read, but got only "+fetchPartitionsCount+" partition infos with lead brokers.");
+		
+		if (partitionsToRead.size() != fetchPartitionsCount) {
+			throw new RuntimeException(partitionsToRead.size() + " partitions to read, but got only "
+					+ fetchPartitionsCount + " partition infos with lead brokers.");
 		}
-		// Create a queue for the threads to communicate
-		int queueSize = Integer.valueOf(config.getProperty(QUEUE_SIZE_KEY, DEFAULT_QUEUE_SIZE));
-		LinkedBlockingQueue<Tuple2<MessageAndOffset, Integer>> messageQueue = new LinkedBlockingQueue<Tuple2<MessageAndOffset, Integer>>(queueSize);
 
 		// create SimpleConsumers for each broker
-		List<SimpleConsumerThread> consumers = new ArrayList<SimpleConsumerThread>(fetchBrokers.size());
-		for(Map.Entry<Node, List<FetchPartition>> brokerInfo: fetchBrokers.entrySet()) {
-			SimpleConsumerThread thread = new SimpleConsumerThread(this.config, topic, brokerInfo.getKey(), brokerInfo.getValue(), messageQueue);
+		ArrayList<SimpleConsumerThread<?>> consumers = new ArrayList<>(fetchBrokers.size());
+		
+		for (Map.Entry<Node, List<FetchPartition>> brokerInfo : fetchBrokers.entrySet()) {
+			final Node broker = brokerInfo.getKey();
+			final List<FetchPartition> partitionsList = brokerInfo.getValue();
+			
+			FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
+
+			SimpleConsumerThread<T> thread = new SimpleConsumerThread<T>(this, config, topic,
+					broker, partitions, sourceContext, valueDeserializer, lastOffsets);
+
+			thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
+					taskName, broker.idString(), broker.host(), broker.port()));
 			thread.setDaemon(true);
-			thread.setName("KafkaConsumer-SimpleConsumer-" + brokerInfo.getKey().idString());
-			thread.start();
 			consumers.add(thread);
-			LOG.info("Starting thread "+thread.getName()+" for fetching from broker "+brokerInfo.getKey().host());
 		}
-
-		// read from queue:
-		while(running) {
-			try {
-				Tuple2<MessageAndOffset, Integer> msg = messageQueue.take();
-				ByteBuffer payload = msg.f0.message().payload();
-				byte[] valueByte = new byte[payload.limit()];
-				payload.get(valueByte);
-				T value = valueDeserializer.deserialize(valueByte);
-				synchronized (sourceContext.getCheckpointLock()) {
-					lastOffsets[msg.f1] = msg.f0.offset();
-					sourceContext.collect(value);
+		
+		// last check whether we should abort.
+		if (!running) {
+			return;
+		}
+		
+		// start all consumer threads
+		for (SimpleConsumerThread<?> t : consumers) {
+			LOG.info("Starting thread {}", t.getName());
+			t.start();
+		}
+		
+		// wait until all consumer threads are done, or until we are aborted, or until
+		// an error occurred in one of the fetcher threads
+		try {
+			boolean someConsumersRunning = true;
+			while (running && error.get() == null && someConsumersRunning) {
+				try {
+					// wait for the consumer threads. if an error occurs, we are interrupted
+					for (SimpleConsumerThread<?> t : consumers) {
+						t.join();
+					}
+	
+					// safety net
+					someConsumersRunning = false;
+					for (SimpleConsumerThread<?> t : consumers) {
+						someConsumersRunning |= t.isAlive();
+					}
 				}
-			} catch (InterruptedException e) {
-				LOG.info("Queue consumption thread got interrupted. Stopping consumption and interrupting other threads");
-				running = false;
-				for(SimpleConsumerThread t: consumers) {
-					t.interrupt();
+				catch (InterruptedException e) {
+					// ignore. we should notice what happened in the next loop check
 				}
 			}
-
-			// see how the consumer threads are doing:
-			for(SimpleConsumerThread t: consumers) {
-				if(t.getError() != null) {
-					throw new RuntimeException("Consumer thread "+t.getName()+" had an exception", t.getError());
-				}
+			
+			// make sure any asynchronous error is noticed
+			Throwable error = this.error.get();
+			if (error != null) {
+				throw new Exception(error.getMessage(), error);
 			}
 		}
-
-		for(SimpleConsumerThread t: consumers) {
-			t.close();
+		finally {
+			// make sure that in any case (completion, abort, error), all spawned threads are stopped
+			for (SimpleConsumerThread<?> t : consumers) {
+				if (t.isAlive()) {
+					t.cancel();
+				}
+			}
 		}
-		sourceContext.close();
-	}
-
-	@Override
-	public void stop() {
-		running = false;
 	}
-
-	@Override
-	public void commit(Map<TopicPartition, Long> offsetsToCommit) {
-		throw new UnsupportedOperationException("This fetcher does not support committing offsets");
+	
+	/**
+	 * Reports an error from a fetch thread. This will cause the main thread to see this error,
+	 * abort, and cancel all other fetch threads.
+	 * 
+	 * @param error The error to report.
+	 */
+	void onErrorInFetchThread(Throwable error) {
+		if (this.error.compareAndSet(null, error)) {
+			// we are the first to report an error
+			if (mainThread != null) {
+				mainThread.interrupt();
+			}
+		}
 	}
 
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Representation of a partition to fetch.
 	 */
 	private static class FetchPartition {
-		/**
-		 * ID of the partition within the topic (0 indexed, as given by Kafka)
-		 */
-		public int partition;
-		/**
-		 * Offset pointing at the next element to read from that partition.
-		 */
-		public long nextOffsetToRead;
-
+		
+		/** ID of the partition within the topic (0 indexed, as given by Kafka) */
+		int partition;
+		
+		/** Offset pointing at the next element to read from that partition. */
+		long nextOffsetToRead;
+
+		FetchPartition(int partition, long nextOffsetToRead) {
+			this.partition = partition;
+			this.nextOffsetToRead = nextOffsetToRead;
+		}
+		
 		@Override
 		public String toString() {
-			return "FetchPartition{" +
-					"partition=" + partition +
-					", offset=" + nextOffsetToRead +
-					'}';
+			return "FetchPartition {partition=" + partition + ", offset=" + nextOffsetToRead + '}';
 		}
 	}
 
-	// --------------------------  Thread for a connection to a broker --------------------------
-
-	private static class SimpleConsumerThread extends Thread {
-
-		private final SimpleConsumer consumer;
-		private final List<FetchPartition> partitions;
-		private final LinkedBlockingQueue<Tuple2<MessageAndOffset, Integer>> messageQueue;
-		private final String clientId;
+	// ------------------------------------------------------------------------
+	//  Per broker fetcher
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Each broker needs its separate connection. This thread implements the connection to
+	 * one broker. The connection can fetch multiple partitions from the broker.
+	 * 
+	 * @param <T> The data type fetched.
+	 */
+	private static class SimpleConsumerThread<T> extends Thread {
+		
+		private final SourceFunction.SourceContext<T> sourceContext;
+		private final DeserializationSchema<T> valueDeserializer;
+		private final long[] offsetsState;
+		
+		private final FetchPartition[] partitions;
+		
+		private final Node broker;
 		private final String topic;
+		private final Properties config;
 
-		private final int fetchSize;
-		private final int maxWait;
-		private final int minBytes;
+		private final LegacyFetcher owner;
 
-		private boolean running = true;
-		private Throwable error = null;
+		private SimpleConsumer consumer;
+		
+		private volatile boolean running = true;
 
 
 		// exceptions are thrown locally
-		public SimpleConsumerThread(Properties config, String topic, Node leader, List<FetchPartition> partitions, LinkedBlockingQueue<Tuple2<MessageAndOffset, Integer>> messageQueue) {
-			Preconditions.checkNotNull(leader, "Leader can not be null");
-			Preconditions.checkNotNull(config, "The config properties can not be null");
-			// these are the actual configuration values of Kafka + their original default values.
-			int soTimeout = Integer.valueOf(config.getProperty("socket.timeout.ms", "30000"));
-			int bufferSize = Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536"));
-
-			this.fetchSize = Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576"));
-			this.maxWait = Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100"));
-			this.minBytes = Integer.valueOf(config.getProperty("fetch.min.bytes", "1"));
-
+		public SimpleConsumerThread(LegacyFetcher owner,
+									Properties config, String topic,
+									Node broker,
+									FetchPartition[] partitions,
+									SourceFunction.SourceContext<T> sourceContext,
+									DeserializationSchema<T> valueDeserializer,
+									long[] offsetsState) {
+			this.owner = owner;
+			this.config = config;
 			this.topic = topic;
+			this.broker = broker;
 			this.partitions = partitions;
-			this.messageQueue = messageQueue;
-			this.clientId = "flink-kafka-consumer-legacy-" + leader.idString();
-			// create consumer
-			consumer = new SimpleConsumer(leader.host(), leader.port(), bufferSize, soTimeout, clientId);
-
-			// list of partitions for which we need to get offsets (this is only effective if the offset is really not initialized
-			List<FetchPartition> getOffsetPartitions = new ArrayList<FetchPartition>();
-			for (FetchPartition fp : partitions) {
-				if (fp.nextOffsetToRead == FlinkKafkaConsumerBase.OFFSET_NOT_SET) {
-					// retrieve the offset from the consumer
-					getOffsetPartitions.add(fp);
-				}
-			}
-			if (getOffsetPartitions.size() > 0) {
-				long timeType = 0;
-				if (config.getProperty("auto.offset.reset", "latest").equals("latest")) {
-					timeType = OffsetRequest.LatestTime();
-				} else {
-					timeType = OffsetRequest.EarliestTime();
-				}
-				getLastOffset(consumer, topic, getOffsetPartitions, timeType);
-				LOG.info("No offsets found for topic " + topic + ", fetched the following start offsets {}", getOffsetPartitions);
-			}
+			this.sourceContext = checkNotNull(sourceContext);
+			this.valueDeserializer = checkNotNull(valueDeserializer);
+			this.offsetsState = checkNotNull(offsetsState);
 		}
 
 		@Override
 		public void run() {
 			try {
+				// set up the config values
+				final String clientId = "flink-kafka-consumer-legacy-" + broker.idString();
+
+				// these are the actual configuration values of Kafka + their original default values.
+				
+				final int soTimeout = Integer.valueOf(config.getProperty("socket.timeout.ms", "30000"));
+				final int bufferSize = Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536"));
+				final int fetchSize = Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576"));
+				final int maxWait = Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100"));
+				final int minBytes = Integer.valueOf(config.getProperty("fetch.min.bytes", "1"));
+				
+				// create the Kafka consumer that we actually use for fetching
+				consumer = new SimpleConsumer(broker.host(), broker.port(), bufferSize, soTimeout, clientId);
+
+				// make sure that all partitions have some offsets to start with
+				// those partitions that do not have an offset from a checkpoint need to get
+				// their start offset from ZooKeeper
+				
+				List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<FetchPartition>();
+
+				for (FetchPartition fp : partitions) {
+					if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
+						// retrieve the offset from the consumer
+						partitionsToGetOffsetsFor.add(fp);
+					}
+				}
+				if (partitionsToGetOffsetsFor.size() > 0) {
+					long timeType;
+					if (config.getProperty("auto.offset.reset", "latest").equals("latest")) {
+						timeType = OffsetRequest.LatestTime();
+					} else {
+						timeType = OffsetRequest.EarliestTime();
+					}
+					getLastOffset(consumer, topic, partitionsToGetOffsetsFor, timeType);
+					LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
+							topic, partitionsToGetOffsetsFor);
+				}
+				
+				// Now, the actual work starts :-)
+				
 				while (running) {
 					FetchRequestBuilder frb = new FetchRequestBuilder();
-					frb.clientId(this.clientId);
+					frb.clientId(clientId);
 					frb.maxWait(maxWait);
 					frb.minBytes(minBytes);
+					
 					for (FetchPartition fp : partitions) {
-						frb.addFetch(topic, fp.partition, fp.nextOffsetToRead, this.fetchSize);
+						frb.addFetch(topic, fp.partition, fp.nextOffsetToRead, fetchSize);
 					}
 					kafka.api.FetchRequest fetchRequest = frb.build();
 					LOG.debug("Issuing fetch request {}", fetchRequest);
 
-					FetchResponse fetchResponse = null;
+					FetchResponse fetchResponse;
 					fetchResponse = consumer.fetch(fetchRequest);
 
-
 					if (fetchResponse.hasError()) {
 						String exception = "";
 						for (FetchPartition fp : partitions) {
 							short code;
 							if ((code = fetchResponse.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
-								exception += "\nException for partition " + fp.partition + ": " + StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+								exception += "\nException for partition " + fp.partition + ": " + 
+										StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
 							}
 						}
-						throw new RuntimeException("Error while fetching from broker: " + exception);
+						throw new IOException("Error while fetching from broker: " + exception);
 					}
 
 					int messagesInFetch = 0;
 					for (FetchPartition fp : partitions) {
-						ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, fp.partition);
+						final ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, fp.partition);
+						final int partition = fp.partition;
+						
 						for (MessageAndOffset msg : messageSet) {
-							messagesInFetch++;
-							try {
+							if (running) {
+								messagesInFetch++;
 								if (msg.offset() < fp.nextOffsetToRead) {
-									LOG.info("Skipping message with offset " + msg.offset() + " because we have seen messages until " + fp.nextOffsetToRead + " from partition " + fp.partition + " already");
 									// we have seen this message already
+									LOG.info("Skipping message with offset " + msg.offset()
+											+ " because we have seen messages until " + fp.nextOffsetToRead
+											+ " from partition " + fp.partition + " already");
 									continue;
 								}
-								messageQueue.put(new Tuple2<MessageAndOffset, Integer>(msg, fp.partition));
-								fp.nextOffsetToRead = msg.offset() + 1; // advance offset for the next request
-							} catch (InterruptedException e) {
-								LOG.debug("Consumer thread got interrupted. Stopping consumption");
-								running = false;
+								
+								ByteBuffer payload = msg.message().payload();
+								byte[] valueByte = new byte[payload.remaining()];
+								payload.get(valueByte);
+								
+								final T value = valueDeserializer.deserialize(valueByte);
+								final long offset = msg.offset();
+										
+								synchronized (sourceContext.getCheckpointLock()) {
+									offsetsState[partition] = offset;
+									sourceContext.collect(value);
+								}
+								
+								// advance offset for the next request
+								fp.nextOffsetToRead = offset + 1;
+							}
+							else {
+								// no longer running
+								return;
 							}
 						}
 					}
 					LOG.debug("This fetch contained {} messages", messagesInFetch);
 				}
-			} catch(Throwable cause) {
-				this.error = new RuntimeException("Error while reading data in thread "+this.getName(), cause);
-			} finally {
+			}
+			catch (Throwable t) {
+				// report to the main thread
+				owner.onErrorInFetchThread(t);
+			}
+			finally {
 				// end of run loop. close connection to consumer
-				consumer.close();
+				if (consumer != null) {
+					// closing the consumer should not fail the program
+					try {
+						consumer.close();
+					}
+					catch (Throwable t) {
+						LOG.error("Error while closing the Kafka simple consumer", t);
+					}
+				}
 			}
-
 		}
 
-		public void close() {
-			running = false;
-			consumer.close();
+		/**
+		 * Cancels this fetch thread. The thread will release all resources and terminate.
+		 */
+		public void cancel() {
+			this.running = false;
+			
+			// interrupt whatever the consumer is doing
+			if (consumer != null) {
+				consumer.close();
+			}
+			
+			this.interrupt();
 		}
 
-		public Throwable getError() {
-			return error;
+		/**
+		 * Request latest offsets for a set of partitions, via a Kafka consumer.
+		 *
+		 * @param consumer The consumer connected to lead broker
+		 * @param topic The topic name
+		 * @param partitions The list of partitions we need offsets for
+		 * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
+		 */
+		private static void getLastOffset(SimpleConsumer consumer, String topic, List<FetchPartition> partitions, long whichTime) {
+
+			Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+			for (FetchPartition fp: partitions) {
+				TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition);
+				requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+			}
+
+			kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+			OffsetResponse response = consumer.getOffsetsBefore(request);
+
+			if (response.hasError()) {
+				String exception = "";
+				for (FetchPartition fp: partitions) {
+					short code;
+					if ( (code=response.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
+						exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+					}
+				}
+				throw new RuntimeException("Unable to get last offset for topic " + topic + " and partitions " + partitions
+						+ ". " + exception);
+			}
+
+			for (FetchPartition fp: partitions) {
+				// the resulting offset is the next offset we are going to read
+				// for not-yet-consumed partitions, it is 0.
+				fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
+			}
 		}
 	}
+	
+	private static class PartitionInfoFetcher extends Thread {
 
-	/**
-	 * Request latest offsets from Kafka.
-	 *
-	 * @param consumer consumer connected to lead broker
-	 * @param topic topic name
-	 * @param partitions list of partitions we need offsets for
-	 * @param whichTime type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
-	 */
-	private static void getLastOffset(SimpleConsumer consumer, String topic, List<FetchPartition> partitions, long whichTime) {
+		private final String topic;
+		private final Properties properties;
+		
+		private volatile List<PartitionInfo> result;
+		private volatile Throwable error;
 
-		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-		for(FetchPartition fp: partitions) {
-			TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition);
-			requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+		
+		PartitionInfoFetcher(String topic, Properties properties) {
+			this.topic = topic;
+			this.properties = properties;
 		}
 
-		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
-		OffsetResponse response = consumer.getOffsetsBefore(request);
-
-		if (response.hasError()) {
-			String exception = "";
-			for(FetchPartition fp: partitions) {
-				short code;
-				if( (code=response.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
-					exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
-				}
+		@Override
+		public void run() {
+			try {
+				result = FlinkKafkaConsumer.getPartitionsForTopic(topic, properties);
+			}
+			catch (Throwable t) {
+				this.error = t;
+			}
+		}
+		
+		public List<PartitionInfo> getPartitions() throws Exception {
+			try {
+				this.join();
 			}
-			throw new RuntimeException("Unable to get last offset for topic "+topic+" and partitions "+partitions +". "+exception);
+			catch (InterruptedException e) {
+				throw new Exception("Partition fetching was cancelled before completion");
+			}
+			
+			if (error != null) {
+				throw new Exception("Failed to fetch partitions for topic " + topic, error);
+			}
+			if (result != null) {
+				return result;
+			}
+			throw new Exception("Partition fetching failed");
 		}
+	}
 
-		for(FetchPartition fp: partitions) {
-			// the resulting offset is the next offset we are going to read
-			// for not-yet-consumed partitions, it is 0.
-			fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
+	private static class KillerWatchDog extends Thread {
+		
+		private final Thread toKill;
+		private final long timeout;
+
+		private KillerWatchDog(Thread toKill, long timeout) {
+			super("KillerWatchDog");
+			setDaemon(true);
+			
+			this.toKill = toKill;
+			this.timeout = timeout;
 		}
 
+		@SuppressWarnings("deprecation")
+		@Override
+		public void run() {
+			final long deadline = System.currentTimeMillis() + timeout;
+			long now;
+			
+			while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
+				try {
+					toKill.join(deadline - now);
+				}
+				catch (InterruptedException e) {
+					// ignore here, out job is important!
+				}
+			}
+			
+			// this is harsh, but this watchdog is a last resort
+			if (toKill.isAlive()) {
+				toKill.stop();
+			}
+		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/NewConsumerApiFetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/NewConsumerApiFetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/NewConsumerApiFetcher.java
new file mode 100644
index 0000000..db9424e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/NewConsumerApiFetcher.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.kafka_backport.clients.consumer.CommitType;
+import org.apache.flink.kafka_backport.clients.consumer.ConsumerRecord;
+import org.apache.flink.kafka_backport.clients.consumer.ConsumerRecords;
+import org.apache.flink.kafka_backport.clients.consumer.KafkaConsumer;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.serialization.ByteArrayDeserializer;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A fetcher that uses the new Kafka consumer API to fetch data for a specifies set of partitions.
+ */
+public class NewConsumerApiFetcher implements Fetcher, OffsetHandler {
+
+	private static final String POLL_TIMEOUT_PROPERTY = "flink.kafka.consumer.poll.timeout";
+	private static final long DEFAULT_POLL_TIMEOUT = 50;
+	
+	private static final ByteArrayDeserializer NO_OP_SERIALIZER = new ByteArrayDeserializer();
+
+	
+	private final KafkaConsumer<byte[], byte[]> fetcher;
+	
+	private final long pollTimeout;
+	
+	private volatile boolean running = true;
+
+	
+	public NewConsumerApiFetcher(Properties props) {
+		this.pollTimeout = props.contains(POLL_TIMEOUT_PROPERTY) ?
+				Long.valueOf(props.getProperty(POLL_TIMEOUT_PROPERTY)) :
+				DEFAULT_POLL_TIMEOUT;
+		
+		this.fetcher = new KafkaConsumer<byte[], byte[]>(props, null, NO_OP_SERIALIZER, NO_OP_SERIALIZER);
+	}
+
+	@Override
+	public void setPartitionsToRead(List<TopicPartition> partitions) {
+		synchronized (fetcher) {
+			if (fetcher.subscriptions().isEmpty()) {
+				fetcher.subscribe(partitions.toArray(new TopicPartition[partitions.size()]));
+			}
+			else {
+				throw new IllegalStateException("Fetcher has already subscribed to its set of partitions");
+			}
+		}
+	}
+
+	@Override
+	public void seek(TopicPartition topicPartition, long offsetToRead) {
+		synchronized (fetcher) {
+			fetcher.seek(topicPartition, offsetToRead);
+		}
+	}
+
+	@Override
+	public void close() {
+		running = false;
+		synchronized (fetcher) {
+			fetcher.close();
+		}
+	}
+
+	@Override
+	public <T> void run(SourceFunction.SourceContext<T> sourceContext,
+						DeserializationSchema<T> valueDeserializer, long[] lastOffsets) {
+		while (running) {
+			// poll is always returning a new object.
+			ConsumerRecords<byte[], byte[]> consumed;
+			synchronized (fetcher) {
+				consumed = fetcher.poll(pollTimeout);
+			}
+
+			final Iterator<ConsumerRecord<byte[], byte[]>> records = consumed.iterator();
+			while (running && records.hasNext()) {
+				ConsumerRecord<byte[], byte[]> record = records.next();
+				T value = valueDeserializer.deserialize(record.value());
+				
+				// synchronize inside the loop to allow checkpoints in between batches
+				synchronized (sourceContext.getCheckpointLock()) {
+					sourceContext.collect(value);
+					lastOffsets[record.partition()] = record.offset();
+				}
+			}
+		}
+	}
+
+	@Override
+	public void commit(Map<TopicPartition, Long> offsetsToCommit) {
+		synchronized (fetcher) {
+			fetcher.commit(offsetsToCommit, CommitType.SYNC);
+		}
+	}
+
+	@Override
+	public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
+		// no need to do anything here.
+		// if Kafka manages the offsets, it has them automatically
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java
new file mode 100644
index 0000000..d7eb19d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The offset handler is responsible for locating the initial partition offsets 
+ * where the source should start reading, as well as committing offsets from completed
+ * checkpoints.
+ */
+public interface OffsetHandler {
+
+	/**
+	 * Commits the given offset for the partitions. May commit the offsets to the Kafka broker,
+	 * or to ZooKeeper, based on its configured behavior.
+	 *
+	 * @param offsetsToCommit The offset to commit, per partition.
+	 */
+	void commit(Map<TopicPartition, Long> offsetsToCommit) throws Exception;
+
+	/**
+	 * Positions the given fetcher to the initial read offsets where the stream consumption
+	 * will start from.
+	 * 
+	 * @param partitions The partitions for which to seeks the fetcher to the beginning.
+	 * @param fetcher The fetcher that will pull data from Kafka and must be positioned.
+	 */
+	void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) throws Exception;
+
+	/**
+	 * Closes the offset handler, releasing all resources.
+	 * 
+	 * @throws IOException Thrown, if the closing fails.
+	 */
+	void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java
new file mode 100644
index 0000000..a6417a7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * Simple ZooKeeper serializer for Strings.
+ */
+public class ZooKeeperStringSerializer implements ZkSerializer {
+
+	private static final Charset CHARSET = Charset.forName("UTF-8");
+	
+	@Override
+	public byte[] serialize(Object data) {
+		if (data instanceof String) {
+			return ((String) data).getBytes(CHARSET);
+		}
+		else {
+			throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
+		}
+	}
+
+	@Override
+	public Object deserialize(byte[] bytes) {
+		if (bytes == null) {
+			return null;
+		}
+		else {
+			return new String(bytes, CHARSET);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java
new file mode 100644
index 0000000..9dd1192
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+import kafka.common.TopicAndPartition;
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.flink.kafka_backport.clients.consumer.ConsumerConfig;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.streaming.connectors.FlinkKafkaConsumer;
+import org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class ZookeeperOffsetHandler implements OffsetHandler {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
+	
+	private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;
+	
+	
+	private final ZkClient zkClient;
+	
+	private final String groupId;
+
+	
+	public ZookeeperOffsetHandler(Properties props) {
+		this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+		
+		if (this.groupId == null) {
+			throw new IllegalArgumentException("Required property '"
+					+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
+		}
+		
+		String zkConnect = props.getProperty("zookeeper.connect");
+		if (zkConnect == null) {
+			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
+		}
+		
+		zkClient = new ZkClient(zkConnect,
+				Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
+				Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
+				new ZooKeeperStringSerializer());
+	}
+
+
+	@Override
+	public void commit(Map<TopicPartition, Long> offsetsToCommit) {
+		for (Map.Entry<TopicPartition, Long> entry : offsetsToCommit.entrySet()) {
+			TopicPartition tp = entry.getKey();
+			long offset = entry.getValue();
+			
+			if (offset >= 0) {
+				setOffsetInZooKeeper(zkClient, groupId, tp.topic(), tp.partition(), offset);
+			}
+		}
+	}
+
+	@Override
+	public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
+		for (TopicPartition tp : partitions) {
+			long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.topic(), tp.partition());
+
+			if (offset != OFFSET_NOT_SET) {
+				LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
+						tp.partition(), offset);
+
+				// the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
+				fetcher.seek(tp, offset + 1);
+			}
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		zkClient.close();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Communication with Zookeeper
+	// ------------------------------------------------------------------------
+	
+	public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
+		TopicAndPartition tap = new TopicAndPartition(topic, partition);
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+		ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
+	}
+
+	public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) {
+		TopicAndPartition tap = new TopicAndPartition(topic, partition);
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+
+		scala.Tuple2<Option<String>, Stat> data = ZkUtils.readDataMaybeNull(zkClient,
+				topicDirs.consumerOffsetDir() + "/" + tap.partition());
+
+		if (data._1().isEmpty()) {
+			return OFFSET_NOT_SET;
+		} else {
+			return Long.valueOf(data._1().get());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java
index 9a20186..218315f 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java
@@ -14,56 +14,87 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors;
 
-import org.apache.flink.streaming.connectors.internals.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
-import java.util.Arrays;
+import org.junit.Test;
+
 import java.util.Properties;
 
 
-public class Kafka081ITCase extends KafkaTestBase {
+public class Kafka081ITCase extends KafkaConsumerTestBase {
+	
 	@Override
-	<T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema deserializationSchema, Properties props) {
-		return new TestFlinkKafkaConsumer081<T>(topic, deserializationSchema, props);
+	protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
+		return new FlinkKafkaConsumer081<T>(topic, deserializationSchema, props);
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Suite of Tests
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testCheckpointing() {
+		runCheckpointingTest();
 	}
 
-	@Override
-	long[] getFinalOffsets() {
-		return TestFlinkKafkaConsumer081.finalOffset;
+	@Test
+	public void testOffsetInZookeeper() {
+		runOffsetInZookeeperValidationTest();
+	}
+	
+	@Test
+	public void testConcurrentProducerConsumerTopology() {
+		runSimpleConcurrentProducerConsumerTopology();
 	}
 
-	@Override
-	void resetOffsets() {
-		TestFlinkKafkaConsumer081.finalOffset = null;
+	// --- canceling / failures ---
+	
+	@Test
+	public void testCancelingEmptyTopic() {
+		runCancelingOnEmptyInputTest();
+	}
+
+	@Test
+	public void testCancelingFullTopic() {
+		runCancelingOnFullInputTest();
 	}
 
+	@Test
+	public void testFailOnDeploy() {
+		runFailOnDeployTest();
+	}
 
-	public static class TestFlinkKafkaConsumer081<OUT> extends FlinkKafkaConsumer081<OUT> {
-		public static long[] finalOffset;
-		public TestFlinkKafkaConsumer081(String topicName, DeserializationSchema<OUT> deserializationSchema, Properties consumerConfig) {
-			super(topicName, deserializationSchema, consumerConfig);
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			synchronized (commitedOffsets) {
-				LOG.info("Setting final offset from "+ Arrays.toString(commitedOffsets));
-				if (finalOffset == null) {
-					finalOffset = new long[commitedOffsets.length];
-				}
-				for(int i = 0; i < commitedOffsets.length; i++) {
-					if(commitedOffsets[i] > 0) {
-						if(finalOffset[i] > 0) {
-							throw new RuntimeException("This is unexpected on i = "+i);
-						}
-						finalOffset[i] = commitedOffsets[i];
-					}
-				}
-			}
-		}
+	// --- source to partition mappings and exactly once ---
+	
+	@Test
+	public void testOneToOneSources() {
+		runOneToOneExactlyOnceTest();
 	}
 
+	@Test
+	public void testOneSourceMultiplePartitions() {
+		runOneSourceMultiplePartitionsExactlyOnceTest();
+	}
+
+	@Test
+	public void testMultipleSourcesOnePartition() {
+		runMultipleSourcesOnePartitionExactlyOnceTest();
+	}
+
+	// --- broker failure ---
+
+	@Test
+	public void testBrokerFailure() {
+		runBrokerFailureTest();
+	}
+
+	// --- special executions ---
+	
+	@Test
+	public void testBigRecordJob() {
+		runBigRecordTestTopology();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java
index 43cd0f9..2f80fcb 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java
@@ -14,57 +14,89 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors;
 
-import org.apache.flink.streaming.connectors.internals.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
-import java.util.Arrays;
+import org.junit.Ignore;
+import org.junit.Test;
+
 import java.util.Properties;
 
 
-public class Kafka082ITCase extends KafkaTestBase {
+public class Kafka082ITCase extends KafkaConsumerTestBase {
+	
 	@Override
-	<T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema deserializationSchema, Properties props) {
-		return new TestFlinkKafkaConsumer082<T>(topic, deserializationSchema, props);
+	protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
+		return new FlinkKafkaConsumer082<T>(topic, deserializationSchema, props);
 	}
 
-	@Override
-	long[] getFinalOffsets() {
-		return TestFlinkKafkaConsumer082.finalOffset;
+	// ------------------------------------------------------------------------
+	//  Suite of Tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testCheckpointing() {
+		runCheckpointingTest();
 	}
 
-	@Override
-	void resetOffsets() {
-		TestFlinkKafkaConsumer082.finalOffset = null;
+	@Test
+	public void testOffsetInZookeeper() {
+		runOffsetInZookeeperValidationTest();
+	}
+
+	@Test
+	public void testConcurrentProducerConsumerTopology() {
+		runSimpleConcurrentProducerConsumerTopology();
+	}
+
+	// --- canceling / failures ---
+
+	@Test
+	public void testCancelingEmptyTopic() {
+		runCancelingOnEmptyInputTest();
 	}
 
+	@Test
+	public void testCancelingFullTopic() {
+		runCancelingOnFullInputTest();
+	}
 
-	public static class TestFlinkKafkaConsumer082<OUT> extends FlinkKafkaConsumer082<OUT> {
-		private final static Object sync = new Object();
-		public static long[] finalOffset;
-		public TestFlinkKafkaConsumer082(String topicName, DeserializationSchema<OUT> deserializationSchema, Properties consumerConfig) {
-			super(topicName, deserializationSchema, consumerConfig);
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			synchronized (commitedOffsets) {
-				LOG.info("Setting final offset from "+ Arrays.toString(commitedOffsets));
-				if (finalOffset == null) {
-					finalOffset = new long[commitedOffsets.length];
-				}
-				for(int i = 0; i < commitedOffsets.length; i++) {
-					if(commitedOffsets[i] > 0) {
-						if(finalOffset[i] > 0) {
-							throw new RuntimeException("This is unexpected on i = "+i);
-						}
-						finalOffset[i] = commitedOffsets[i];
-					}
-				}
-			}
-		}
+	@Test
+	public void testFailOnDeploy() {
+		runFailOnDeployTest();
 	}
 
+	// --- source to partition mappings and exactly once ---
+
+	@Test
+	public void testOneToOneSources() {
+		runOneToOneExactlyOnceTest();
+	}
+
+	@Test
+	public void testOneSourceMultiplePartitions() {
+		runOneSourceMultiplePartitionsExactlyOnceTest();
+	}
+
+	@Test
+	public void testMultipleSourcesOnePartition() {
+		runMultipleSourcesOnePartitionExactlyOnceTest();
+	}
+
+	// --- broker failure ---
+
+	@Test
+	public void testBrokerFailure() {
+		runBrokerFailureTest();
+	}
+
+	// --- special executions ---
+
+	@Test
+	@Ignore("this does not work with the new consumer")
+	public void testBigRecordJob() {
+		runBigRecordTestTopology();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka083ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka083ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka083ITCase.java
deleted file mode 100644
index f7933f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka083ITCase.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.streaming.connectors.internals.FlinkKafkaConsumerBase;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.junit.Ignore;
-
-import java.util.Arrays;
-import java.util.Properties;
-
-
-public class Kafka083ITCase extends KafkaTestBase {
-	@Override
-	<T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema deserializationSchema, Properties props) {
-		return new TestFlinkKafkaConsumer083<T>(topic, deserializationSchema, props);
-	}
-
-	@Override
-	long[] getFinalOffsets() {
-		return TestFlinkKafkaConsumer083.finalOffset;
-	}
-
-	@Override
-	void resetOffsets() {
-		TestFlinkKafkaConsumer083.finalOffset = null;
-	}
-
-
-	public static class TestFlinkKafkaConsumer083<OUT> extends FlinkKafkaConsumer083<OUT> {
-		public static long[] finalOffset;
-		public TestFlinkKafkaConsumer083(String topicName, DeserializationSchema<OUT> deserializationSchema, Properties consumerConfig) {
-			super(topicName, deserializationSchema, consumerConfig);
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			synchronized (commitedOffsets) {
-				LOG.info("Setting final offset from "+ Arrays.toString(commitedOffsets));
-				if (finalOffset == null) {
-					finalOffset = new long[commitedOffsets.length];
-				}
-				for(int i = 0; i < commitedOffsets.length; i++) {
-					if(commitedOffsets[i] > 0) {
-						if(finalOffset[i] > 0) {
-							throw new RuntimeException("This is unexpected on i = "+i);
-						}
-						finalOffset[i] = commitedOffsets[i];
-					}
-				}
-			}
-		}
-	}
-
-	@Ignore
-	@Override
-	public void brokerFailureTest() throws Exception {
-		// Skipping test: The test is committing the offsets to the Kafka Broker.
-		// only 0.8.3 brokers support that.
-		return;
-	}
-
-	@Ignore
-	@Override
-	public void testFlinkKafkaConsumerWithOffsetUpdates() throws Exception {
-		// Skipping test (see above)
-		return;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
new file mode 100644
index 0000000..8248cee
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests that the partition assignment is deterministic and stable.
+ */
+public class KafkaConsumerPartitionAssignmentTest {
+
+	@Test
+	public void testPartitionsEqualConsumers() {
+		try {
+			int[] partitions = {4, 52, 17, 1};
+			
+			for (int i = 0; i < partitions.length; i++) {
+				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
+						partitions, "test-topic", partitions.length, i);
+				
+				assertNotNull(parts);
+				assertEquals(1, parts.size());
+				assertTrue(contains(partitions, parts.get(0).partition()));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMultiplePartitionsPerConsumers() {
+		try {
+			final int[] partitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+
+			final Set<Integer> allPartitions = new HashSet<>();
+			for (int i : partitions) {
+				allPartitions.add(i);
+			}
+			
+			final int numConsumers = 3;
+			final int minPartitionsPerConsumer = partitions.length / numConsumers;
+			final int maxPartitionsPerConsumer = partitions.length / numConsumers + 1;
+			
+			for (int i = 0; i < numConsumers; i++) {
+				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
+						partitions, "test-topic", numConsumers, i);
+
+				assertNotNull(parts);
+				assertTrue(parts.size() >= minPartitionsPerConsumer);
+				assertTrue(parts.size() <= maxPartitionsPerConsumer);
+
+				for (TopicPartition p : parts) {
+					// check that the element was actually contained
+					assertTrue(allPartitions.remove(p.partition()));
+				}
+			}
+
+			// all partitions must have been assigned
+			assertTrue(allPartitions.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPartitionsFewerThanConsumers() {
+		try {
+			final int[] partitions = {4, 52, 17, 1};
+
+			final Set<Integer> allPartitions = new HashSet<>();
+			for (int i : partitions) {
+				allPartitions.add(i);
+			}
+
+			final int numConsumers = 2 * partitions.length + 3;
+			
+			for (int i = 0; i < numConsumers; i++) {
+				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
+						partitions, "test-topic", numConsumers, i);
+
+				assertNotNull(parts);
+				assertTrue(parts.size() <= 1);
+				
+				for (TopicPartition p : parts) {
+					// check that the element was actually contained
+					assertTrue(allPartitions.remove(p.partition()));
+				}
+			}
+
+			// all partitions must have been assigned
+			assertTrue(allPartitions.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testAssignEmptyPartitions() {
+		try {
+			List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 4, 2);
+			assertNotNull(parts1);
+			assertTrue(parts1.isEmpty());
+
+			List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 1, 0);
+			assertNotNull(parts2);
+			assertTrue(parts2.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testGrowingPartitionsRemainsStable() {
+		try {
+			final int[] newPartitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+			final int[] initialPartitions = Arrays.copyOfRange(newPartitions, 0, 7);
+
+			final Set<Integer> allNewPartitions = new HashSet<>();
+			final Set<Integer> allInitialPartitions = new HashSet<>();
+			for (int i : newPartitions) {
+				allNewPartitions.add(i);
+			}
+			for (int i : initialPartitions) {
+				allInitialPartitions.add(i);
+			}
+
+			final int numConsumers = 3;
+			final int minInitialPartitionsPerConsumer = initialPartitions.length / numConsumers;
+			final int maxInitialPartitionsPerConsumer = initialPartitions.length / numConsumers + 1;
+			final int minNewPartitionsPerConsumer = newPartitions.length / numConsumers;
+			final int maxNewPartitionsPerConsumer = newPartitions.length / numConsumers + 1;
+			
+			List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(
+					initialPartitions, "test-topic", numConsumers, 0);
+			List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(
+					initialPartitions, "test-topic", numConsumers, 1);
+			List<TopicPartition> parts3 = FlinkKafkaConsumer.assignPartitions(
+					initialPartitions, "test-topic", numConsumers, 2);
+
+			assertNotNull(parts1);
+			assertNotNull(parts2);
+			assertNotNull(parts3);
+			
+			assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
+			assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
+			assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
+			assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
+			assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
+			assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
+
+			for (TopicPartition p : parts1) {
+				// check that the element was actually contained
+				assertTrue(allInitialPartitions.remove(p.partition()));
+			}
+			for (TopicPartition p : parts2) {
+				// check that the element was actually contained
+				assertTrue(allInitialPartitions.remove(p.partition()));
+			}
+			for (TopicPartition p : parts3) {
+				// check that the element was actually contained
+				assertTrue(allInitialPartitions.remove(p.partition()));
+			}
+			
+			// all partitions must have been assigned
+			assertTrue(allInitialPartitions.isEmpty());
+			
+			// grow the set of partitions and distribute anew
+			
+			List<TopicPartition> parts1new = FlinkKafkaConsumer.assignPartitions(
+					newPartitions, "test-topic", numConsumers, 0);
+			List<TopicPartition> parts2new = FlinkKafkaConsumer.assignPartitions(
+					newPartitions, "test-topic", numConsumers, 1);
+			List<TopicPartition> parts3new = FlinkKafkaConsumer.assignPartitions(
+					newPartitions, "test-topic", numConsumers, 2);
+
+			// new partitions must include all old partitions
+			
+			assertTrue(parts1new.size() > parts1.size());
+			assertTrue(parts2new.size() > parts2.size());
+			assertTrue(parts3new.size() > parts3.size());
+			
+			assertTrue(parts1new.containsAll(parts1));
+			assertTrue(parts2new.containsAll(parts2));
+			assertTrue(parts3new.containsAll(parts3));
+
+			assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
+			assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
+			assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
+			assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
+			assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
+			assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
+
+			for (TopicPartition p : parts1new) {
+				// check that the element was actually contained
+				assertTrue(allNewPartitions.remove(p.partition()));
+			}
+			for (TopicPartition p : parts2new) {
+				// check that the element was actually contained
+				assertTrue(allNewPartitions.remove(p.partition()));
+			}
+			for (TopicPartition p : parts3new) {
+				// check that the element was actually contained
+				assertTrue(allNewPartitions.remove(p.partition()));
+			}
+
+			// all partitions must have been assigned
+			assertTrue(allNewPartitions.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static boolean contains(int[] array, int value) {
+		for (int i : array) {
+			if (i == value) {
+				return true;
+			}
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java
new file mode 100644
index 0000000..4949714
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.kafka_backport.clients.consumer.ConsumerConfig;
+
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class KafkaConsumerTest {
+
+	@Test
+	public void testValidateZooKeeperConfig() {
+		try {
+			// empty
+			Properties emptyProperties = new Properties();
+			try {
+				FlinkKafkaConsumer.validateZooKeeperConfig(emptyProperties);
+				fail("should fail with an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+
+			// no connect string (only group string)
+			Properties noConnect = new Properties();
+			noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
+			try {
+				FlinkKafkaConsumer.validateZooKeeperConfig(noConnect);
+				fail("should fail with an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+
+			// no group string (only connect string)
+			Properties noGroup = new Properties();
+			noGroup.put("zookeeper.connect", "localhost:47574");
+			try {
+				FlinkKafkaConsumer.validateZooKeeperConfig(noGroup);
+				fail("should fail with an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSnapshot() {
+		try {
+			Field offsetsField = FlinkKafkaConsumer.class.getDeclaredField("lastOffsets");
+			Field runningField = FlinkKafkaConsumer.class.getDeclaredField("running");
+			Field mapField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
+			
+			offsetsField.setAccessible(true);
+			runningField.setAccessible(true);
+			mapField.setAccessible(true);
+
+			FlinkKafkaConsumer<?> consumer = mock(FlinkKafkaConsumer.class);
+			when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+			
+			long[] testOffsets = new long[] { 43, 6146, 133, 16, 162, 616 };
+			LinkedMap map = new LinkedMap();
+			
+			offsetsField.set(consumer, testOffsets);
+			runningField.set(consumer, true);
+			mapField.set(consumer, map);
+			
+			assertTrue(map.isEmpty());
+
+			// make multiple checkpoints
+			for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) {
+				long[] checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
+				assertArrayEquals(testOffsets, checkpoint);
+				
+				// change the offsets, make sure the snapshot did not change
+				long[] checkpointCopy = Arrays.copyOf(checkpoint, checkpoint.length);
+				
+				for (int i = 0; i < testOffsets.length; i++) {
+					testOffsets[i] += 1L;
+				}
+				
+				assertArrayEquals(checkpointCopy, checkpoint);
+				
+				assertTrue(map.size() > 0);
+				assertTrue(map.size() <= FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	@Ignore("Kafka consumer internally makes an infinite loop")
+	public void testCreateSourceWithoutCluster() {
+		try {
+			Properties props = new Properties();
+			props.setProperty("zookeeper.connect", "localhost:56794");
+			props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
+			props.setProperty("group.id", "non-existent-group");
+			
+			new FlinkKafkaConsumer<String>("no op topic", new JavaDefaultStringSchema(), props,
+					FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
+					FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}