You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:51 UTC

[34/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Remove copied Kafka code again. Implemented our own topic metadata retrieval.

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
new file mode 100644
index 0000000..00067ea
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+import kafka.api.FetchRequestBuilder;
+import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.FlinkKafkaConsumer;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * This fetcher uses Kafka's low-level API to pull data from a specific
+ * set of partitions and offsets for a certain topic.
+ * 
+ * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p>
+ */
+public class LegacyFetcher implements Fetcher {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
+
+	/** The topic from which this fetcher pulls data */
+	private final String topic;
+	
+	/** The properties that configure the Kafka connection */
+	private final Properties config;
+	
+	/** The task name, to give more readable names to the spawned threads */
+	private final String taskName;
+	
+	/** The first error that occurred in a connection thread */
+	private final AtomicReference<Throwable> error;
+
+	/** The partitions that the fetcher should read, with their starting offsets */
+	private Map<TopicPartition, Long> partitionsToRead;
+	
+	/** Reference the the thread that executed the run() method. */
+	private volatile Thread mainThread;
+	
+	/** Flag to shot the fetcher down */
+	private volatile boolean running = true;
+
+	public LegacyFetcher(String topic, Properties props, String taskName) {
+		this.config = checkNotNull(props, "The config properties cannot be null");
+		this.topic = checkNotNull(topic, "The topic cannot be null");
+		this.taskName = taskName;
+		this.error = new AtomicReference<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Fetcher methods
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void setPartitionsToRead(List<TopicPartition> partitions) {
+		partitionsToRead = new HashMap<>(partitions.size());
+		for (TopicPartition tp: partitions) {
+			partitionsToRead.put(tp, FlinkKafkaConsumer.OFFSET_NOT_SET);
+		}
+	}
+
+	@Override
+	public void seek(TopicPartition topicPartition, long offsetToRead) {
+		if (partitionsToRead == null) {
+			throw new IllegalArgumentException("No partitions to read set");
+		}
+		if (!partitionsToRead.containsKey(topicPartition)) {
+			throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition
+					+ ") we are not going to read. Partitions to read " + partitionsToRead);
+		}
+		partitionsToRead.put(topicPartition, offsetToRead);
+	}
+	
+	@Override
+	public void close() {
+		// flag needs to be check by the run() method that creates the spawned threads
+		this.running = false;
+		
+		// all other cleanup is made by the run method itself
+	}
+
+	@Override
+	public <T> void run(SourceFunction.SourceContext<T> sourceContext, 
+						DeserializationSchema<T> valueDeserializer,
+						long[] lastOffsets) throws Exception {
+		
+		if (partitionsToRead == null || partitionsToRead.size() == 0) {
+			throw new IllegalArgumentException("No partitions set");
+		}
+		
+		// NOTE: This method is needs to always release all resources it acquires
+		
+		this.mainThread = Thread.currentThread();
+
+		LOG.info("Reading from partitions " + partitionsToRead + " using the legacy fetcher");
+		
+		// get lead broker for each partition
+		
+		// NOTE: The kafka client apparently locks itself in an infinite loop sometimes
+		// when it is interrupted, so we run it only in a separate thread.
+		// since it sometimes refuses to shut down, we resort to the admittedly harsh
+		// means of killing the thread after a timeout.
+		PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(topic, config);
+		infoFetcher.start();
+		
+		KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
+		watchDog.start();
+		
+		final List<PartitionInfo> allPartitionsInTopic = infoFetcher.getPartitions();
+		
+		// brokers to fetch partitions from.
+		int fetchPartitionsCount = 0;
+		Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<>();
+		
+		for (PartitionInfo partitionInfo : allPartitionsInTopic) {
+			if (partitionInfo.leader() == null) {
+				throw new RuntimeException("Unable to consume partition " + partitionInfo.partition()
+						+ " from topic "+partitionInfo.topic()+" because it does not have a leader");
+			}
+			
+			for (Map.Entry<TopicPartition, Long> entry : partitionsToRead.entrySet()) {
+				final TopicPartition topicPartition = entry.getKey();
+				final long offset = entry.getValue();
+				
+				// check if that partition is for us
+				if (topicPartition.partition() == partitionInfo.partition()) {
+					List<FetchPartition> partitions = fetchBrokers.get(partitionInfo.leader());
+					if (partitions == null) {
+						partitions = new ArrayList<>();
+						fetchBrokers.put(partitionInfo.leader(), partitions);
+					}
+					
+					partitions.add(new FetchPartition(topicPartition.partition(), offset));
+					fetchPartitionsCount++;
+					
+				}
+				// else this partition is not for us
+			}
+		}
+		
+		if (partitionsToRead.size() != fetchPartitionsCount) {
+			throw new RuntimeException(partitionsToRead.size() + " partitions to read, but got only "
+					+ fetchPartitionsCount + " partition infos with lead brokers.");
+		}
+
+		// create SimpleConsumers for each broker
+		ArrayList<SimpleConsumerThread<?>> consumers = new ArrayList<>(fetchBrokers.size());
+		
+		for (Map.Entry<Node, List<FetchPartition>> brokerInfo : fetchBrokers.entrySet()) {
+			final Node broker = brokerInfo.getKey();
+			final List<FetchPartition> partitionsList = brokerInfo.getValue();
+			
+			FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
+
+			SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config, topic,
+					broker, partitions, sourceContext, valueDeserializer, lastOffsets);
+
+			thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
+					taskName, broker.id(), broker.host(), broker.port()));
+			thread.setDaemon(true);
+			consumers.add(thread);
+		}
+		
+		// last check whether we should abort.
+		if (!running) {
+			return;
+		}
+		
+		// start all consumer threads
+		for (SimpleConsumerThread<?> t : consumers) {
+			LOG.info("Starting thread {}", t.getName());
+			t.start();
+		}
+		
+		// wait until all consumer threads are done, or until we are aborted, or until
+		// an error occurred in one of the fetcher threads
+		try {
+			boolean someConsumersRunning = true;
+			while (running && error.get() == null && someConsumersRunning) {
+				try {
+					// wait for the consumer threads. if an error occurs, we are interrupted
+					for (SimpleConsumerThread<?> t : consumers) {
+						t.join();
+					}
+	
+					// safety net
+					someConsumersRunning = false;
+					for (SimpleConsumerThread<?> t : consumers) {
+						someConsumersRunning |= t.isAlive();
+					}
+				}
+				catch (InterruptedException e) {
+					// ignore. we should notice what happened in the next loop check
+				}
+			}
+			
+			// make sure any asynchronous error is noticed
+			Throwable error = this.error.get();
+			if (error != null) {
+				throw new Exception(error.getMessage(), error);
+			}
+		}
+		finally {
+			// make sure that in any case (completion, abort, error), all spawned threads are stopped
+			for (SimpleConsumerThread<?> t : consumers) {
+				if (t.isAlive()) {
+					t.cancel();
+				}
+			}
+		}
+	}
+	
+	/**
+	 * Reports an error from a fetch thread. This will cause the main thread to see this error,
+	 * abort, and cancel all other fetch threads.
+	 * 
+	 * @param error The error to report.
+	 */
+	void onErrorInFetchThread(Throwable error) {
+		if (this.error.compareAndSet(null, error)) {
+			// we are the first to report an error
+			if (mainThread != null) {
+				mainThread.interrupt();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Representation of a partition to fetch.
+	 */
+	private static class FetchPartition {
+		
+		/** ID of the partition within the topic (0 indexed, as given by Kafka) */
+		int partition;
+		
+		/** Offset pointing at the next element to read from that partition. */
+		long nextOffsetToRead;
+
+		FetchPartition(int partition, long nextOffsetToRead) {
+			this.partition = partition;
+			this.nextOffsetToRead = nextOffsetToRead;
+		}
+		
+		@Override
+		public String toString() {
+			return "FetchPartition {partition=" + partition + ", offset=" + nextOffsetToRead + '}';
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Per broker fetcher
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Each broker needs its separate connection. This thread implements the connection to
+	 * one broker. The connection can fetch multiple partitions from the broker.
+	 * 
+	 * @param <T> The data type fetched.
+	 */
+	private static class SimpleConsumerThread<T> extends Thread {
+		
+		private final SourceFunction.SourceContext<T> sourceContext;
+		private final DeserializationSchema<T> valueDeserializer;
+		private final long[] offsetsState;
+		
+		private final FetchPartition[] partitions;
+		
+		private final Node broker;
+		private final String topic;
+		private final Properties config;
+
+		private final LegacyFetcher owner;
+
+		private SimpleConsumer consumer;
+		
+		private volatile boolean running = true;
+
+
+		// exceptions are thrown locally
+		public SimpleConsumerThread(LegacyFetcher owner,
+									Properties config, String topic,
+									Node broker,
+									FetchPartition[] partitions,
+									SourceFunction.SourceContext<T> sourceContext,
+									DeserializationSchema<T> valueDeserializer,
+									long[] offsetsState) {
+			this.owner = owner;
+			this.config = config;
+			this.topic = topic;
+			this.broker = broker;
+			this.partitions = partitions;
+			this.sourceContext = checkNotNull(sourceContext);
+			this.valueDeserializer = checkNotNull(valueDeserializer);
+			this.offsetsState = checkNotNull(offsetsState);
+		}
+
+		@Override
+		public void run() {
+			try {
+				// set up the config values
+				final String clientId = "flink-kafka-consumer-legacy-" + broker.id();
+
+				// these are the actual configuration values of Kafka + their original default values.
+				final int soTimeout = Integer.valueOf(config.getProperty("socket.timeout.ms", "30000"));
+				final int bufferSize = Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536"));
+				final int fetchSize = Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576"));
+				final int maxWait = Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100"));
+				final int minBytes = Integer.valueOf(config.getProperty("fetch.min.bytes", "1"));
+				
+				// create the Kafka consumer that we actually use for fetching
+				consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
+
+				// make sure that all partitions have some offsets to start with
+				// those partitions that do not have an offset from a checkpoint need to get
+				// their start offset from ZooKeeper
+				
+				List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
+
+				for (FetchPartition fp : partitions) {
+					if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
+						// retrieve the offset from the consumer
+						partitionsToGetOffsetsFor.add(fp);
+					}
+				}
+				if (partitionsToGetOffsetsFor.size() > 0) {
+					long timeType;
+					if (config.getProperty("auto.offset.reset", "latest").equals("latest")) {
+						timeType = OffsetRequest.LatestTime();
+					} else {
+						timeType = OffsetRequest.EarliestTime();
+					}
+					getLastOffset(consumer, topic, partitionsToGetOffsetsFor, timeType);
+					LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
+							topic, partitionsToGetOffsetsFor);
+				}
+				
+				// Now, the actual work starts :-)
+				
+				while (running) {
+					FetchRequestBuilder frb = new FetchRequestBuilder();
+					frb.clientId(clientId);
+					frb.maxWait(maxWait);
+					frb.minBytes(minBytes);
+					
+					for (FetchPartition fp : partitions) {
+						frb.addFetch(topic, fp.partition, fp.nextOffsetToRead, fetchSize);
+					}
+					kafka.api.FetchRequest fetchRequest = frb.build();
+					LOG.debug("Issuing fetch request {}", fetchRequest);
+
+					FetchResponse fetchResponse;
+					fetchResponse = consumer.fetch(fetchRequest);
+
+					if (fetchResponse.hasError()) {
+						String exception = "";
+						for (FetchPartition fp : partitions) {
+							short code;
+							if ((code = fetchResponse.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
+								exception += "\nException for partition " + fp.partition + ": " + 
+										StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+							}
+						}
+						throw new IOException("Error while fetching from broker: " + exception);
+					}
+
+					int messagesInFetch = 0;
+					for (FetchPartition fp : partitions) {
+						final ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, fp.partition);
+						final int partition = fp.partition;
+						
+						for (MessageAndOffset msg : messageSet) {
+							if (running) {
+								messagesInFetch++;
+								if (msg.offset() < fp.nextOffsetToRead) {
+									// we have seen this message already
+									LOG.info("Skipping message with offset " + msg.offset()
+											+ " because we have seen messages until " + fp.nextOffsetToRead
+											+ " from partition " + fp.partition + " already");
+									continue;
+								}
+								
+								ByteBuffer payload = msg.message().payload();
+								byte[] valueByte = new byte[payload.remaining()];
+								payload.get(valueByte);
+								
+								final T value = valueDeserializer.deserialize(valueByte);
+								final long offset = msg.offset();
+										
+								synchronized (sourceContext.getCheckpointLock()) {
+									sourceContext.collect(value);
+									offsetsState[partition] = offset;
+								}
+								
+								// advance offset for the next request
+								fp.nextOffsetToRead = offset + 1;
+							}
+							else {
+								// no longer running
+								return;
+							}
+						}
+					}
+					LOG.debug("This fetch contained {} messages", messagesInFetch);
+				}
+			}
+			catch (Throwable t) {
+				// report to the main thread
+				owner.onErrorInFetchThread(t);
+			}
+			finally {
+				// end of run loop. close connection to consumer
+				if (consumer != null) {
+					// closing the consumer should not fail the program
+					try {
+						consumer.close();
+					}
+					catch (Throwable t) {
+						LOG.error("Error while closing the Kafka simple consumer", t);
+					}
+				}
+			}
+		}
+
+		/**
+		 * Cancels this fetch thread. The thread will release all resources and terminate.
+		 */
+		public void cancel() {
+			this.running = false;
+			
+			// interrupt whatever the consumer is doing
+			if (consumer != null) {
+				consumer.close();
+			}
+			
+			this.interrupt();
+		}
+
+		/**
+		 * Request latest offsets for a set of partitions, via a Kafka consumer.
+		 *
+		 * @param consumer The consumer connected to lead broker
+		 * @param topic The topic name
+		 * @param partitions The list of partitions we need offsets for
+		 * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
+		 */
+		private static void getLastOffset(SimpleConsumer consumer, String topic, List<FetchPartition> partitions, long whichTime) {
+
+			Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
+			for (FetchPartition fp: partitions) {
+				TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition);
+				requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+			}
+
+			kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+			OffsetResponse response = consumer.getOffsetsBefore(request);
+
+			if (response.hasError()) {
+				String exception = "";
+				for (FetchPartition fp: partitions) {
+					short code;
+					if ( (code=response.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
+						exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+					}
+				}
+				throw new RuntimeException("Unable to get last offset for topic " + topic + " and partitions " + partitions
+						+ ". " + exception);
+			}
+
+			for (FetchPartition fp: partitions) {
+				// the resulting offset is the next offset we are going to read
+				// for not-yet-consumed partitions, it is 0.
+				fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
+			}
+		}
+	}
+	
+	private static class PartitionInfoFetcher extends Thread {
+
+		private final String topic;
+		private final Properties properties;
+		
+		private volatile List<PartitionInfo> result;
+		private volatile Throwable error;
+
+		
+		PartitionInfoFetcher(String topic, Properties properties) {
+			this.topic = topic;
+			this.properties = properties;
+		}
+
+		@Override
+		public void run() {
+			try {
+				result = FlinkKafkaConsumer.getPartitionsForTopic(topic, properties);
+			}
+			catch (Throwable t) {
+				this.error = t;
+			}
+		}
+		
+		public List<PartitionInfo> getPartitions() throws Exception {
+			try {
+				this.join();
+			}
+			catch (InterruptedException e) {
+				throw new Exception("Partition fetching was cancelled before completion");
+			}
+			
+			if (error != null) {
+				throw new Exception("Failed to fetch partitions for topic " + topic, error);
+			}
+			if (result != null) {
+				return result;
+			}
+			throw new Exception("Partition fetching failed");
+		}
+	}
+
+	private static class KillerWatchDog extends Thread {
+		
+		private final Thread toKill;
+		private final long timeout;
+
+		private KillerWatchDog(Thread toKill, long timeout) {
+			super("KillerWatchDog");
+			setDaemon(true);
+			
+			this.toKill = toKill;
+			this.timeout = timeout;
+		}
+
+		@SuppressWarnings("deprecation")
+		@Override
+		public void run() {
+			final long deadline = System.currentTimeMillis() + timeout;
+			long now;
+			
+			while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
+				try {
+					toKill.join(deadline - now);
+				}
+				catch (InterruptedException e) {
+					// ignore here, our job is important!
+				}
+			}
+			
+			// this is harsh, but this watchdog is a last resort
+			if (toKill.isAlive()) {
+				toKill.stop();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java
new file mode 100644
index 0000000..e1fe702
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The offset handler is responsible for locating the initial partition offsets 
+ * where the source should start reading, as well as committing offsets from completed
+ * checkpoints.
+ */
+public interface OffsetHandler {
+
+	/**
+	 * Commits the given offset for the partitions. May commit the offsets to the Kafka broker,
+	 * or to ZooKeeper, based on its configured behavior.
+	 *
+	 * @param offsetsToCommit The offset to commit, per partition.
+	 */
+	void commit(Map<TopicPartition, Long> offsetsToCommit) throws Exception;
+
+	/**
+	 * Positions the given fetcher to the initial read offsets where the stream consumption
+	 * will start from.
+	 * 
+	 * @param partitions The partitions for which to seeks the fetcher to the beginning.
+	 * @param fetcher The fetcher that will pull data from Kafka and must be positioned.
+	 */
+	void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) throws Exception;
+
+	/**
+	 * Closes the offset handler, releasing all resources.
+	 * 
+	 * @throws IOException Thrown, if the closing fails.
+	 */
+	void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java
new file mode 100644
index 0000000..a6417a7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * Simple ZooKeeper serializer for Strings.
+ */
+public class ZooKeeperStringSerializer implements ZkSerializer {
+
+	private static final Charset CHARSET = Charset.forName("UTF-8");
+	
+	@Override
+	public byte[] serialize(Object data) {
+		if (data instanceof String) {
+			return ((String) data).getBytes(CHARSET);
+		}
+		else {
+			throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
+		}
+	}
+
+	@Override
+	public Object deserialize(byte[] bytes) {
+		if (bytes == null) {
+			return null;
+		}
+		else {
+			return new String(bytes, CHARSET);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java
new file mode 100644
index 0000000..d909d5a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+import kafka.common.TopicAndPartition;
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.flink.streaming.connectors.FlinkKafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class ZookeeperOffsetHandler implements OffsetHandler {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
+	
+	private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;
+	
+	
+	private final ZkClient zkClient;
+	
+	private final String groupId;
+
+	
+	public ZookeeperOffsetHandler(Properties props) {
+		this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+		
+		if (this.groupId == null) {
+			throw new IllegalArgumentException("Required property '"
+					+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
+		}
+		
+		String zkConnect = props.getProperty("zookeeper.connect");
+		if (zkConnect == null) {
+			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
+		}
+		
+		zkClient = new ZkClient(zkConnect,
+				Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
+				Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
+				new ZooKeeperStringSerializer());
+	}
+
+
+	@Override
+	public void commit(Map<TopicPartition, Long> offsetsToCommit) {
+		for (Map.Entry<TopicPartition, Long> entry : offsetsToCommit.entrySet()) {
+			TopicPartition tp = entry.getKey();
+			long offset = entry.getValue();
+			
+			if (offset >= 0) {
+				setOffsetInZooKeeper(zkClient, groupId, tp.topic(), tp.partition(), offset);
+			}
+		}
+	}
+
+	@Override
+	public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
+		for (TopicPartition tp : partitions) {
+			long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.topic(), tp.partition());
+
+			if (offset != OFFSET_NOT_SET) {
+				LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
+						tp.partition(), offset);
+
+				// the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
+				fetcher.seek(tp, offset + 1);
+			}
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		zkClient.close();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Communication with Zookeeper
+	// ------------------------------------------------------------------------
+	
+	public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
+		TopicAndPartition tap = new TopicAndPartition(topic, partition);
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+		ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
+	}
+
+	public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) {
+		TopicAndPartition tap = new TopicAndPartition(topic, partition);
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+
+		scala.Tuple2<Option<String>, Stat> data = ZkUtils.readDataMaybeNull(zkClient,
+				topicDirs.consumerOffsetDir() + "/" + tap.partition());
+
+		if (data._1().isEmpty()) {
+			return OFFSET_NOT_SET;
+		} else {
+			return Long.valueOf(data._1().get());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
deleted file mode 100644
index fe6684d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-
-public class KafkaConsumerExample {
-
-	private static String host;
-	private static int port;
-	private static String topic;
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
-
-		DataStream<String> kafkaStream = env
-				.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
-		kafkaStream.print();
-
-		env.execute();
-	}
-
-	private static boolean parseParameters(String[] args) {
-		if (args.length == 3) {
-			host = args[0];
-			port = Integer.parseInt(args[1]);
-			topic = args[2];
-			return true;
-		} else {
-			System.err.println("Usage: KafkaConsumerExample <host> <port> <topic>");
-			return false;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
deleted file mode 100644
index f241d1c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-
-@SuppressWarnings("serial")
-public class KafkaProducerExample {
-
-	public static void main(String[] args) throws Exception {
-		
-		if (args.length < 3) {
-			System.err.println("Usage: KafkaProducerExample <host> <port> <topic>");
-			return;
-		}
-
-		final String host = args[0];
-		final int port = Integer.parseInt(args[1]);
-		final String topic = args[2];
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
-		
-		env.addSource(new SourceFunction<String>() {
-			
-			private volatile boolean running = true;
-			
-			@Override
-			public void run(SourceContext<String> ctx) throws Exception {
-				for (int i = 0; i < 20 && running; i++) {
-					ctx.collect("message #" + i);
-					Thread.sleep(100L);
-				}
-
-				ctx.collect("q");
-			}
-
-			@Override
-			public void cancel() {
-				running = false;
-			}
-
-
-		})
-			.addSink(new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema()))
-				.setParallelism(3);
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 0965b29..ead24f3 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -14,180 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.streaming.connectors.kafka.api;
 
-import java.util.Map;
-import java.util.Properties;
 
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
-import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import kafka.serializer.DefaultEncoder;
-
 
 /**
  * Sink that emits its inputs to a Kafka topic.
  *
- * @param <IN>
- * 		Type of the sink input
+ * The KafkaSink has been relocated to org.apache.flink.streaming.connectors.KafkaSink.
+ * This class will be removed in future releases of Flink.
  */
-public class KafkaSink<IN> extends RichSinkFunction<IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
-
-	private Producer<IN, byte[]> producer;
-	private Properties userDefinedProperties;
-	private String topicId;
-	private String brokerList;
-	private SerializationSchema<IN, byte[]> schema;
-	private SerializableKafkaPartitioner partitioner;
-	private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
-
-	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 *			Addresses of the brokers
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 */
-	public KafkaSink(String brokerList, String topicId,
-			SerializationSchema<IN, byte[]> serializationSchema) {
-		this(brokerList, topicId, new Properties(), serializationSchema);
+@Deprecated
+public class KafkaSink<IN> extends org.apache.flink.streaming.connectors.KafkaSink<IN> {
+	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
+		super(brokerList, topicId, serializationSchema);
 	}
-
-	/**
-	 * Creates a KafkaSink for a given topic with custom Producer configuration.
-	 * If you use this constructor, the broker should be set with the "metadata.broker.list"
-	 * configuration.
-	 *
-	 * @param brokerList
-	 * 		Addresses of the brokers
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param producerConfig
-	 * 		Configurations of the Kafka producer
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 */
-	public KafkaSink(String brokerList, String topicId, Properties producerConfig,
-			SerializationSchema<IN, byte[]> serializationSchema) {
-		String[] elements = brokerList.split(",");
-		for(String broker: elements) {
-			NetUtils.ensureCorrectHostnamePort(broker);
-		}
-		Preconditions.checkNotNull(topicId, "TopicID not set");
-
-		this.brokerList = brokerList;
-		this.topicId = topicId;
-		this.schema = serializationSchema;
-		this.partitionerClass = null;
-		this.userDefinedProperties = producerConfig;
-	}
-
-	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 * @param partitioner
-	 * 		User defined partitioner.
-	 */
-	public KafkaSink(String brokerList, String topicId,
-			SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
-		this(brokerList, topicId, serializationSchema);
-		ClosureCleaner.ensureSerializable(partitioner);
-		this.partitioner = partitioner;
-	}
-
-	public KafkaSink(String brokerList,
-			String topicId,
-			SerializationSchema<IN, byte[]> serializationSchema,
-			Class<? extends SerializableKafkaPartitioner> partitioner) {
-		this(brokerList, topicId, serializationSchema);
-		this.partitionerClass = partitioner;
-	}
-
-	/**
-	 * Initializes the connection to Kafka.
-	 */
-	@Override
-	public void open(Configuration configuration) {
-
-		Properties properties = new Properties();
-
-		properties.put("metadata.broker.list", brokerList);
-		properties.put("request.required.acks", "-1");
-		properties.put("message.send.max.retries", "10");
-
-		properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
-
-		// this will not be used as the key will not be serialized
-		properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
-
-		for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
-			properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
-		}
-
-		if (partitioner != null) {
-			properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
-			// java serialization will do the rest.
-			properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
-		}
-		if (partitionerClass != null) {
-			properties.put("partitioner.class", partitionerClass);
-		}
-
-		ProducerConfig config = new ProducerConfig(properties);
-
-		try {
-			producer = new Producer<IN, byte[]>(config);
-		} catch (NullPointerException e) {
-			throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
-		}
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to Kafka.
-	 *
-	 * @param next
-	 * 		The incoming data
-	 */
-	@Override
-	public void invoke(IN next) {
-		byte[] serialized = schema.serialize(next);
-
-		// Sending message without serializable key.
-		producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
-	}
-
-	@Override
-	public void close() {
-		if (producer != null) {
-			producer.close();
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
deleted file mode 100644
index 3bcbfa7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import com.google.common.base.Preconditions;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.ConnectorSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Source that listens to a Kafka topic using the high level Kafka API.
- * 
- * <p><b>IMPORTANT:</b> This source is not participating in the checkpointing procedure
- * and hence gives no form of processing guarantees.
- * Use the {@link org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource}
- * for a fault tolerant source that provides "exactly once" processing guarantees when used with
- * checkpointing enabled.</p>
- *
- * @param <OUT>
- *            Type of the messages on the topic.
- */
-public class KafkaSource<OUT> extends ConnectorSource<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
-	
-	private static final String DEFAULT_GROUP_ID = "flink-group";
-	private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
-
-	private final String zookeeperAddress;
-	private final String groupId;
-	private final String topicId;
-	private final Properties customProperties;
-	private final long zookeeperSyncTimeMillis;
-	
-	private transient ConsumerConnector consumer;
-	private transient ConsumerIterator<byte[], byte[]> consumerIterator;
-	
-	private volatile boolean isRunning;
-
-	/**
-	 * Creates a KafkaSource that consumes a topic.
-	 *
-	 * @param zookeeperAddress
-	 *            Address of the Zookeeper host (with port number).
-	 * @param topicId
-	 *            ID of the Kafka topic.
-	 * @param groupId
-	 * 			   ID of the consumer group.
-	 * @param deserializationSchema
-	 *            User defined deserialization schema.
-	 * @param zookeeperSyncTimeMillis
-	 *            Synchronization time with zookeeper.
-	 */
-	public KafkaSource(String zookeeperAddress,
-		String topicId,
-		String groupId,
-		DeserializationSchema<OUT> deserializationSchema,
-		long zookeeperSyncTimeMillis) {
-		this(zookeeperAddress, topicId, groupId, deserializationSchema, zookeeperSyncTimeMillis, null);
-	}
-	/**
-	 * Creates a KafkaSource that consumes a topic.
-	 *
-	 * @param zookeeperAddress
-	 *            Address of the Zookeeper host (with port number).
-	 * @param topicId
-	 *            ID of the Kafka topic.
-	 * @param groupId
-	 * 			   ID of the consumer group.
-	 * @param deserializationSchema
-	 *            User defined deserialization schema.
-	 * @param zookeeperSyncTimeMillis
-	 *            Synchronization time with zookeeper.
-	 * @param customProperties
-	 * 			  Custom properties for Kafka
-	 */
-	public KafkaSource(String zookeeperAddress,
-						String topicId, String groupId,
-						DeserializationSchema<OUT> deserializationSchema,
-						long zookeeperSyncTimeMillis, Properties customProperties)
-	{
-		super(deserializationSchema);
-		
-		Preconditions.checkNotNull(zookeeperAddress, "ZK address is null");
-		Preconditions.checkNotNull(topicId, "Topic ID is null");
-		Preconditions.checkNotNull(deserializationSchema, "deserializationSchema is null");
-		Preconditions.checkArgument(zookeeperSyncTimeMillis >= 0, "The ZK sync time must be positive");
-
-		this.zookeeperAddress = zookeeperAddress;
-		this.groupId = groupId;
-		this.topicId = topicId;
-		this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
-		this.customProperties = customProperties;
-	}
-
-	/**
-	 * Creates a KafkaSource that consumes a topic.
-	 *
-	 * @param zookeeperAddress
-	 *            Address of the Zookeeper host (with port number).
-	 * @param topicId
-	 *            ID of the Kafka topic.
-	 * @param deserializationSchema
-	 *            User defined deserialization schema.
-	 * @param zookeeperSyncTimeMillis
-	 *            Synchronization time with zookeeper.
-	 */
-	public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
-		this(zookeeperAddress, topicId, DEFAULT_GROUP_ID, deserializationSchema, zookeeperSyncTimeMillis, null);
-	}
-	/**
-	 * Creates a KafkaSource that consumes a topic.
-	 *
-	 * @param zookeeperAddress
-	 *            Address of the Zookeeper host (with port number).
-	 * @param topicId
-	 *            ID of the Kafka topic.
-	 * @param deserializationSchema
-	 *            User defined deserialization schema.
-	 */
-	public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema) {
-		this(zookeeperAddress, topicId, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
-	}
-
-	/**
-	 * Initializes the connection to Kafka.
-	 */
-	private void initializeConnection() {
-		Properties props = new Properties();
-		props.put("zookeeper.connect", zookeeperAddress);
-		props.put("group.id", groupId);
-		props.put("zookeeper.session.timeout.ms", "10000");
-		props.put("zookeeper.sync.time.ms", Long.toString(zookeeperSyncTimeMillis));
-		props.put("auto.commit.interval.ms", "1000");
-
-		if (customProperties != null) {
-			for(Map.Entry<Object, Object> e : props.entrySet()) {
-				if(props.contains(e.getKey())) {
-					LOG.warn("Overwriting property "+e.getKey()+" with value "+e.getValue());
-				}
-				props.put(e.getKey(), e.getValue());
-			}
-		}
-
-		consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
-
-		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
-				.createMessageStreams(Collections.singletonMap(topicId, 1));
-		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicId);
-		KafkaStream<byte[], byte[]> stream = streams.get(0);
-
-		consumer.commitOffsets();
-
-		consumerIterator = stream.iterator();
-	}
-
-	@Override
-	public void run(SourceContext<OUT> ctx) throws Exception {
-		
-		// NOTE: Since this source is not checkpointed, we do not need to
-		// acquire the checkpoint lock
-		try {
-			while (isRunning && consumerIterator.hasNext()) {
-				OUT out = schema.deserialize(consumerIterator.next().message());
-				if (schema.isEndOfStream(out)) {
-					break;
-				}
-				ctx.collect(out);
-			}
-		} finally {
-			consumer.shutdown();
-		}
-	}
-
-	@Override
-	public void open(Configuration config) throws Exception {
-		initializeConnection();
-		isRunning = true;
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-		if (consumer != null) {
-			consumer.shutdown();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
deleted file mode 100644
index 7ae17df..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.api.config;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Hacky wrapper to send an object instance through a Properties - map.
- *
- * This works as follows:
- * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
- *
- * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
- * This is set in the key-value (java.util.Properties) map.
- * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
- * This is a hack because the put() method is called on the underlying Hashmap.
- *
- * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
- *
- * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
- */
-public class PartitionerWrapper implements Partitioner {
-	public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
-
-	private Partitioner wrapped;
-	public PartitionerWrapper(VerifiableProperties properties) {
-		wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
-	}
-
-	@Override
-	public int partition(Object value, int numberOfPartitions) {
-		return wrapped.partition(value, numberOfPartitions);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 042fcdf..4181134 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -17,370 +17,36 @@
 
 package org.apache.flink.streaming.connectors.kafka.api.persistent;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import kafka.common.TopicAndPartition;
-import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.FlinkKafkaConsumer;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Option;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
 
-import com.google.common.base.Preconditions;
 
 /**
- * Source for reading from Kafka using Flink Streaming Fault Tolerance.
- * This source is updating the committed offset in Zookeeper based on the internal checkpointing of Flink.
+ * Creates a Kafka consumer compatible with reading from Kafka 0.8.1+ consumers.
+ *
+ * This class is provided as a migration path from the old Flink kafka connectors to the new, updated implemntations.
+ *
+ * Please use FlinkKafkaConsumer081 and FlinkKafkaConsumer082.
  *
- * Note that the autocommit feature of Kafka needs to be disabled for using this source.
+ * @param <T> The type of elements produced by this consumer.
  */
-public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> implements
-		ResultTypeQueryable<OUT>,
-		CheckpointNotifier, CheckpointedAsynchronously<long[]> {
+@Deprecated
+public class PersistentKafkaSource<T> extends FlinkKafkaConsumer<T> {
 
-	private static final long serialVersionUID = 287845877188312621L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
+	private static final long serialVersionUID = -8450689820627198228L;
 
-	private final LinkedMap pendingCheckpoints = new LinkedMap();
-
-	private final String topicName;
-	private final DeserializationSchema<OUT> deserializationSchema;
-	
-	private transient ConsumerConfig consumerConfig;
-	private transient ConsumerIterator<byte[], byte[]> iteratorToRead;
-	private transient ConsumerConnector consumer;
-	
-	private transient ZkClient zkClient;
-
-	private transient long[] lastOffsets;			// Current offset (backuped state)
-	protected transient long[] commitedOffsets; 	// maintain committed offsets, to avoid committing the same over and over again.
-	private transient long[] restoreState;			// set by the restore() method, used by open() to valdiate the restored state.
-
-	private volatile boolean running;
-	
 	/**
+	 * Creates a new Kafka 0.8.2.x streaming source consumer.
 	 *
-	 * For the @param consumerConfig, specify at least the "groupid" and "zookeeper.connect" string.
-	 * The config will be passed into the Kafka High Level Consumer.
-	 * For a full list of possible values, check this out: https://kafka.apache.org/documentation.html#consumerconfigs
-	 */
-	public PersistentKafkaSource(String topicName, DeserializationSchema<OUT> deserializationSchema, ConsumerConfig consumerConfig) {
-		Preconditions.checkNotNull(topicName);
-		Preconditions.checkNotNull(deserializationSchema);
-		Preconditions.checkNotNull(consumerConfig);
-
-		this.topicName = topicName;
-		this.deserializationSchema = deserializationSchema;
-		this.consumerConfig = consumerConfig;
-		if (consumerConfig.autoCommitEnable()) {
-			throw new IllegalArgumentException("'auto.commit.enable' is set to 'true'. " +
-					"This source can only be used with auto commit disabled because the " +
-					"source is committing to zookeeper by itself (not using the KafkaConsumer).");
-		}
-		if (!consumerConfig.offsetsStorage().equals("zookeeper")) {
-			// we can currently only commit to ZK.
-			throw new IllegalArgumentException("The 'offsets.storage' has to be set to 'zookeeper' for this Source to work reliably");
-		}
-	}
-
-	// ---------------------- ParallelSourceFunction Lifecycle -----------------
-
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(this.consumerConfig);
-		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
-		// will see each message only once.
-		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
-		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topicCountMap);
-		if(streams.size() != 1) {
-			throw new RuntimeException("Expected only one message stream but got "+streams.size());
-		}
-		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
-		if(kafkaStreams == null) {
-			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
-		}
-		if(kafkaStreams.size() != 1) {
-			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
-		}
-		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, consumerConfig.groupId());
-		this.iteratorToRead = kafkaStreams.get(0).iterator();
-		this.consumer = consumer;
-
-		zkClient = new ZkClient(consumerConfig.zkConnect(),
-				consumerConfig.zkSessionTimeoutMs(),
-				consumerConfig.zkConnectionTimeoutMs(),
-				new KafkaZKStringSerializer());
-
-		// most likely the number of offsets we're going to store here will be lower than the number of partitions.
-		int numPartitions = getNumberOfPartitions();
-		LOG.debug("The topic {} has {} partitions", topicName, numPartitions);
-		this.lastOffsets = new long[numPartitions];
-		this.commitedOffsets = new long[numPartitions];
-
-		// check if there are offsets to restore
-		if (restoreState != null) {
-			if (restoreState.length != numPartitions) {
-				throw new IllegalStateException("There are "+restoreState.length+" offsets to restore for topic "+topicName+" but " +
-						"there are only "+numPartitions+" in the topic");
-			}
-
-			LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(restoreState));
-			setOffsetsInZooKeeper(restoreState);
-			this.lastOffsets = restoreState;
-		} else {
-			// initialize empty offsets
-			Arrays.fill(this.lastOffsets, -1);
-		}
-		Arrays.fill(this.commitedOffsets, 0); // just to make it clear
-
-		pendingCheckpoints.clear();
-		running = true;
-	}
-
-	@Override
-	public void run(SourceContext<OUT> ctx) throws Exception {
-		if (iteratorToRead == null) {
-			throw new IllegalStateException("Kafka iterator not initialized properly.");
-		}
-
-		final Object checkpointLock = ctx.getCheckpointLock();
-		
-		while (running && iteratorToRead.hasNext()) {
-			MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next();
-			if(lastOffsets[message.partition()] >= message.offset()) {
-				LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
-				continue;
-			}
-			OUT next = deserializationSchema.deserialize(message.message());
-
-			if (deserializationSchema.isEndOfStream(next)) {
-				LOG.info("DeserializationSchema signaled end of stream for this source");
-				break;
-			}
-
-			// make the state update and the element emission atomic
-			synchronized (checkpointLock) {
-				lastOffsets[message.partition()] = message.offset();
-				ctx.collect(next);
-			}
-
-			if (LOG.isTraceEnabled()) {
-				LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition());
-			}
-		}
-	}
-
-	@Override
-	public void cancel() {
-		running = false;
-	}
-
-	@Override
-	public void close() {
-		LOG.info("Closing Kafka consumer");
-		this.consumer.shutdown();
-		zkClient.close();
-	}
-
-	// -----------------  State Checkpointing -----------------
-
-	@Override
-	public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		if (lastOffsets == null) {
-			LOG.warn("State snapshot requested on not yet opened source. Returning null");
-			return null;
-		}
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}",
-					Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
-		}
-
-		long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
-
-		// the map may be asynchronously updates when committing to Kafka, so we synchronize
-		synchronized (pendingCheckpoints) {
-			pendingCheckpoints.put(checkpointId, currentOffsets);
-		}
-
-		return currentOffsets;
-	}
-
-	@Override
-	public void restoreState(long[] state) {
-		LOG.info("The state will be restored to {} in the open() method", Arrays.toString(state));
-		this.restoreState = Arrays.copyOf(state, state.length);
-	}
-
-	
-	/**
-	 * Notification on completed checkpoints
-	 * @param checkpointId The ID of the checkpoint that has been completed.
-	 * @throws Exception 
-	 */
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {
-		LOG.info("Commit checkpoint {}", checkpointId);
-
-		long[] checkpointOffsets;
-
-		// the map may be asynchronously updates when snapshotting state, so we synchronize
-		synchronized (pendingCheckpoints) {
-			final int posInMap = pendingCheckpoints.indexOf(checkpointId);
-			if (posInMap == -1) {
-				LOG.warn("Unable to find pending checkpoint for id {}", checkpointId);
-				return;
-			}
-
-			checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
-			// remove older checkpoints in map:
-			if (!pendingCheckpoints.isEmpty()) {
-				for(int i = 0; i < posInMap; i++) {
-					pendingCheckpoints.remove(0);
-				}
-			}
-		}
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Committing offsets {} to ZooKeeper", Arrays.toString(checkpointOffsets));
-		}
-
-		setOffsetsInZooKeeper(checkpointOffsets);
-	}
-
-	private void setOffsetsInZooKeeper(long[] offsets) {
-		for (int partition = 0; partition < offsets.length; partition++) {
-			long offset = offsets[partition];
-			if(offset != -1) {
-				setOffset(partition, offset);
-			}
-		}
-	}
-
-	// --------------------- Zookeeper / Offset handling -----------------------------
-
-	private int getNumberOfPartitions() {
-		scala.collection.immutable.List<String> scalaSeq = JavaConversions.asScalaBuffer(Collections.singletonList(topicName)).toList();
-		scala.collection.mutable.Map<String, Seq<Object>> list =  ZkUtils.getPartitionsForTopics(zkClient, scalaSeq);
-		Option<Seq<Object>> topicOption = list.get(topicName);
-		if(topicOption.isEmpty()) {
-			throw new IllegalArgumentException("Unable to get number of partitions for topic "+topicName+" from "+list.toString());
-		}
-		Seq<Object> topic = topicOption.get();
-		return topic.size();
-	}
-
-	protected void setOffset(int partition, long offset) {
-		// synchronize because notifyCheckpointComplete is called using asynchronous worker threads (= multiple checkpoints might be confirmed concurrently)
-		synchronized (commitedOffsets) {
-			if(commitedOffsets[partition] < offset) {
-				setOffset(zkClient, consumerConfig.groupId(), topicName, partition, offset);
-				commitedOffsets[partition] = offset;
-			} else {
-				LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
-			}
-		}
-	}
-
-
-
-	// the following two methods are static to allow access from the outside as well (Testcases)
-
-	/**
-	 * This method's code is based on ZookeeperConsumerConnector.commitOffsetToZooKeeper()
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param consumerConfig
+	 *           The consumer config used to configure the Kafka consumer client, and the ZooKeeper client.
 	 */
-	public static void setOffset(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
-		LOG.info("Setting offset for partition {} of topic {} in group {} to offset {}", partition, topic, groupId, offset);
-		TopicAndPartition tap = new TopicAndPartition(topic, partition);
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
-		ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
-	}
-
-	public static long getOffset(ZkClient zkClient, String groupId, String topic, int partition) {
-		TopicAndPartition tap = new TopicAndPartition(topic, partition);
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
-		scala.Tuple2<String, Stat> data = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition());
-		return Long.valueOf(data._1());
-	}
-
-
-	// ---------------------- (Java)Serialization methods for the consumerConfig -----------------
-
-	private void writeObject(ObjectOutputStream out)
-			throws IOException, ClassNotFoundException {
-		out.defaultWriteObject();
-		out.writeObject(consumerConfig.props().props());
-	}
-
-	private void readObject(ObjectInputStream in)
-			throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		Properties props = (Properties) in.readObject();
-		consumerConfig = new ConsumerConfig(props);
-	}
-
-
-	@Override
-	public TypeInformation<OUT> getProducedType() {
-		return deserializationSchema.getProducedType();
-	}
-
-	// ---------------------- Zookeeper Serializer copied from Kafka (because it has private access there)  -----------------
-
-	public static class KafkaZKStringSerializer implements ZkSerializer {
-
-		@Override
-		public byte[] serialize(Object data) throws ZkMarshallingError {
-			try {
-				return ((String) data).getBytes("UTF-8");
-			} catch (UnsupportedEncodingException e) {
-				throw new RuntimeException(e);
-			}
-		}
-
-		@Override
-		public Object deserialize(byte[] bytes) throws ZkMarshallingError {
-			if (bytes == null) {
-				return null;
-			} else {
-				try {
-					return new String(bytes, "UTF-8");
-				} catch (UnsupportedEncodingException e) {
-					throw new RuntimeException(e);
-				}
-			}
-		}
+	public PersistentKafkaSource(String topic, DeserializationSchema<T> valueDeserializer, ConsumerConfig consumerConfig) {
+		super(topic, valueDeserializer, consumerConfig.props().props(), OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java
deleted file mode 100644
index 661d0bd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-public class KafkaConstantPartitioner implements SerializableKafkaPartitioner {
-
-	private static final long serialVersionUID = 1L;
-	private int partition;
-
-	public KafkaConstantPartitioner(int partition) {
-		this.partition = partition;
-	}
-
-	@Override
-	public int partition(Object value, int numberOfPartitions) {
-		return partition;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java
deleted file mode 100644
index 77a774e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import kafka.producer.Partitioner;
-import java.io.Serializable;
-
-public interface SerializableKafkaPartitioner extends Serializable, Partitioner {
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
new file mode 100644
index 0000000..cef1606
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests that the partition assignment is deterministic and stable.
+ */
+public class KafkaConsumerPartitionAssignmentTest {
+
+	@Test
+	public void testPartitionsEqualConsumers() {
+		try {
+			int[] partitions = {4, 52, 17, 1};
+			
+			for (int i = 0; i < partitions.length; i++) {
+				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
+						partitions, "test-topic", partitions.length, i);
+				
+				assertNotNull(parts);
+				assertEquals(1, parts.size());
+				assertTrue(contains(partitions, parts.get(0).partition()));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMultiplePartitionsPerConsumers() {
+		try {
+			final int[] partitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+
+			final Set<Integer> allPartitions = new HashSet<>();
+			for (int i : partitions) {
+				allPartitions.add(i);
+			}
+			
+			final int numConsumers = 3;
+			final int minPartitionsPerConsumer = partitions.length / numConsumers;
+			final int maxPartitionsPerConsumer = partitions.length / numConsumers + 1;
+			
+			for (int i = 0; i < numConsumers; i++) {
+				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
+						partitions, "test-topic", numConsumers, i);
+
+				assertNotNull(parts);
+				assertTrue(parts.size() >= minPartitionsPerConsumer);
+				assertTrue(parts.size() <= maxPartitionsPerConsumer);
+
+				for (TopicPartition p : parts) {
+					// check that the element was actually contained
+					assertTrue(allPartitions.remove(p.partition()));
+				}
+			}
+
+			// all partitions must have been assigned
+			assertTrue(allPartitions.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPartitionsFewerThanConsumers() {
+		try {
+			final int[] partitions = {4, 52, 17, 1};
+
+			final Set<Integer> allPartitions = new HashSet<>();
+			for (int i : partitions) {
+				allPartitions.add(i);
+			}
+
+			final int numConsumers = 2 * partitions.length + 3;
+			
+			for (int i = 0; i < numConsumers; i++) {
+				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
+						partitions, "test-topic", numConsumers, i);
+
+				assertNotNull(parts);
+				assertTrue(parts.size() <= 1);
+				
+				for (TopicPartition p : parts) {
+					// check that the element was actually contained
+					assertTrue(allPartitions.remove(p.partition()));
+				}
+			}
+
+			// all partitions must have been assigned
+			assertTrue(allPartitions.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testAssignEmptyPartitions() {
+		try {
+			List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 4, 2);
+			assertNotNull(parts1);
+			assertTrue(parts1.isEmpty());
+
+			List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 1, 0);
+			assertNotNull(parts2);
+			assertTrue(parts2.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testGrowingPartitionsRemainsStable() {
+		try {
+			final int[] newPartitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+			final int[] initialPartitions = Arrays.copyOfRange(newPartitions, 0, 7);
+
+			final Set<Integer> allNewPartitions = new HashSet<>();
+			final Set<Integer> allInitialPartitions = new HashSet<>();
+			for (int i : newPartitions) {
+				allNewPartitions.add(i);
+			}
+			for (int i : initialPartitions) {
+				allInitialPartitions.add(i);
+			}
+
+			final int numConsumers = 3;
+			final int minInitialPartitionsPerConsumer = initialPartitions.length / numConsumers;
+			final int maxInitialPartitionsPerConsumer = initialPartitions.length / numConsumers + 1;
+			final int minNewPartitionsPerConsumer = newPartitions.length / numConsumers;
+			final int maxNewPartitionsPerConsumer = newPartitions.length / numConsumers + 1;
+			
+			List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(
+					initialPartitions, "test-topic", numConsumers, 0);
+			List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(
+					initialPartitions, "test-topic", numConsumers, 1);
+			List<TopicPartition> parts3 = FlinkKafkaConsumer.assignPartitions(
+					initialPartitions, "test-topic", numConsumers, 2);
+
+			assertNotNull(parts1);
+			assertNotNull(parts2);
+			assertNotNull(parts3);
+			
+			assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
+			assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
+			assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
+			assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
+			assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
+			assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
+
+			for (TopicPartition p : parts1) {
+				// check that the element was actually contained
+				assertTrue(allInitialPartitions.remove(p.partition()));
+			}
+			for (TopicPartition p : parts2) {
+				// check that the element was actually contained
+				assertTrue(allInitialPartitions.remove(p.partition()));
+			}
+			for (TopicPartition p : parts3) {
+				// check that the element was actually contained
+				assertTrue(allInitialPartitions.remove(p.partition()));
+			}
+			
+			// all partitions must have been assigned
+			assertTrue(allInitialPartitions.isEmpty());
+			
+			// grow the set of partitions and distribute anew
+			
+			List<TopicPartition> parts1new = FlinkKafkaConsumer.assignPartitions(
+					newPartitions, "test-topic", numConsumers, 0);
+			List<TopicPartition> parts2new = FlinkKafkaConsumer.assignPartitions(
+					newPartitions, "test-topic", numConsumers, 1);
+			List<TopicPartition> parts3new = FlinkKafkaConsumer.assignPartitions(
+					newPartitions, "test-topic", numConsumers, 2);
+
+			// new partitions must include all old partitions
+			
+			assertTrue(parts1new.size() > parts1.size());
+			assertTrue(parts2new.size() > parts2.size());
+			assertTrue(parts3new.size() > parts3.size());
+			
+			assertTrue(parts1new.containsAll(parts1));
+			assertTrue(parts2new.containsAll(parts2));
+			assertTrue(parts3new.containsAll(parts3));
+
+			assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
+			assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
+			assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
+			assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
+			assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
+			assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
+
+			for (TopicPartition p : parts1new) {
+				// check that the element was actually contained
+				assertTrue(allNewPartitions.remove(p.partition()));
+			}
+			for (TopicPartition p : parts2new) {
+				// check that the element was actually contained
+				assertTrue(allNewPartitions.remove(p.partition()));
+			}
+			for (TopicPartition p : parts3new) {
+				// check that the element was actually contained
+				assertTrue(allNewPartitions.remove(p.partition()));
+			}
+
+			// all partitions must have been assigned
+			assertTrue(allNewPartitions.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static boolean contains(int[] array, int value) {
+		for (int i : array) {
+			if (i == value) {
+				return true;
+			}
+		}
+		return false;
+	}
+}