You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/26 21:13:59 UTC

[09/10] flink git commit: [FLINK-2386] [kafka] Move Kafka connectors to 'org.apache.flink.streaming.connectors.kafka'

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
new file mode 100644
index 0000000..a98e84c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.cluster.Broker;
+import kafka.common.ErrorMapping;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.collections.map.LinkedMap;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
+import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
+ * Apache Kafka. The consumer can run in multiple parallel instances, each of which will pull
+ * data from one or more Kafka partitions. 
+ * 
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly once". 
+ * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
+ * 
+ * <p>To support a variety of Kafka brokers, protocol versions, and offset committing approaches,
+ * the Flink Kafka Consumer can be parametrized with a <i>fetcher</i> and an <i>offset handler</i>.</p>
+ *
+ * <h1>Fetcher</h1>
+ * 
+ * <p>The fetcher is responsible to pull data from Kafka. Because Kafka has undergone a change in
+ * protocols and APIs, there are currently two fetchers available:</p>
+ * 
+ * <ul>
+ *     <li>{@link FetcherType#NEW_HIGH_LEVEL}: A fetcher based on the new Kafka consumer API.
+ *         This fetcher is generally more robust, but works only with later versions of
+ *         Kafka (> 0.8.2).</li>
+ *         
+ *     <li>{@link FetcherType#LEGACY_LOW_LEVEL}: A fetcher based on the old low-level consumer API.
+ *         This fetcher is works also with older versions of Kafka (0.8.1). The fetcher interprets
+ *         the old Kafka consumer properties, like:
+ *         <ul>
+ *             <li>socket.timeout.ms</li>
+ *             <li>socket.receive.buffer.bytes</li>
+ *             <li>fetch.message.max.bytes</li>
+ *             <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li>
+ *             <li>fetch.wait.max.ms</li>
+ *         </ul>
+ *     </li>
+ * </ul>
+ * 
+ * <h1>Offset handler</h1>
+ * 
+ * <p>Offsets whose records have been read and are checkpointed will be committed back to Kafka / ZooKeeper
+ * by the offset handler. In addition, the offset handler finds the point where the source initially
+ * starts reading from the stream, when the streaming job is started.</p>
+ * 
+ * <p>Currently, the source offers two different offset handlers exist:</p>
+ * <ul>
+ *     <li>{@link OffsetStore#KAFKA}: Use this offset handler when the Kafka brokers are managing the offsets,
+ *         and hence offsets need to be committed the Kafka brokers, rather than to ZooKeeper.
+ *         Note that this offset handler works only on new versions of Kafka (0.8.2.x +) and
+ *         with the {@link FetcherType#NEW_HIGH_LEVEL} fetcher.</li>
+ *         
+ *     <li>{@link OffsetStore#FLINK_ZOOKEEPER}: Use this offset handler when the offsets are managed
+ *         by ZooKeeper, as in older versions of Kafka (0.8.1.x)</li>
+ * </ul>
+ * 
+ * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
+ * has consumed a topic.</p>
+ * 
+ * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
+ * is constructed. That means that the client that submits the program needs to be able to
+ * reach the Kafka brokers or ZooKeeper.</p>
+ */
+public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
+		implements CheckpointCommitter, CheckpointedAsynchronously<long[]>, ResultTypeQueryable<T> {
+
+	/**
+	 * The offset store defines how acknowledged offsets are committed back to Kafka. Different
+	 * options include letting Flink periodically commit to ZooKeeper, or letting Kafka manage the
+	 * offsets (new Kafka versions only).
+	 */
+	public enum OffsetStore {
+
+		/**
+		 * Let Flink manage the offsets. Flink will periodically commit them to Zookeeper (usually after
+		 * successful checkpoints), in the same structure as Kafka 0.8.2.x
+		 * 
+		 * <p>Use this mode when using the source with Kafka 0.8.1.x brokers.</p>
+		 */
+		FLINK_ZOOKEEPER,
+
+		/**
+		 * Use the mechanisms in Kafka to commit offsets. Depending on the Kafka configuration, different
+		 * mechanism will be used (broker coordinator, zookeeper)
+		 */ 
+		KAFKA
+	}
+
+	/**
+	 * The fetcher type defines which code paths to use to pull data from teh Kafka broker.
+	 */
+	public enum FetcherType {
+
+		/**
+		 * The legacy fetcher uses Kafka's old low-level consumer API.
+		 * 
+		 * <p>Use this fetcher for Kafka 0.8.1 brokers.</p>
+		 */
+		LEGACY_LOW_LEVEL,
+
+		/**
+		 * This fetcher uses a backport of the new consumer API to pull data from the Kafka broker.
+		 * It is the fetcher that will be maintained in the future, and it already 
+		 * handles certain failure cases with less overhead than the legacy fetcher.
+		 * 
+		 * <p>This fetcher works only Kafka 0.8.2 and 0.8.3 (and future versions).</p>
+		 */
+		NEW_HIGH_LEVEL
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	private static final long serialVersionUID = -6272159445203409112L;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
+
+	/** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
+	 * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
+	public static final long OFFSET_NOT_SET = -915623761776L;
+
+	/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
+	public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+	/** Configuration key for the number of retries for getting the partition info */
+	public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
+
+	/** Default number of retries for getting the partition info. One retry means going through the full list of brokers */
+	public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
+
+	
+	
+	// ------  Configuration of the Consumer -------
+	
+	/** The offset store where this consumer commits safe offsets */
+	private final OffsetStore offsetStore;
+
+	/** The type of fetcher to be used to pull data from Kafka */
+	private final FetcherType fetcherType;
+	
+	/** name of the topic consumed by this source */
+	private final String topic;
+	
+	/** The properties to parametrize the Kafka consumer and ZooKeeper client */ 
+	private final Properties props;
+	
+	/** The ids of the partitions that are read by this consumer */
+	private final int[] partitions;
+	
+	/** The schema to convert between Kafka#s byte messages, and Flink's objects */
+	private final DeserializationSchema<T> valueDeserializer;
+
+	// ------  Runtime State  -------
+
+	/** Data for pending but uncommitted checkpoints */
+	private final LinkedMap pendingCheckpoints = new LinkedMap();
+	
+	/** The fetcher used to pull data from the Kafka brokers */
+	private transient Fetcher fetcher;
+	
+	/** The committer that persists the committed offsets */
+	private transient OffsetHandler offsetHandler;
+	
+	/** The partitions actually handled by this consumer */
+	private transient List<TopicPartition> subscribedPartitions;
+
+	/** The offsets of the last returned elements */
+	private transient long[] lastOffsets;
+
+	/** The latest offsets that have been committed to Kafka or ZooKeeper. These are never
+	 * newer then the last offsets (Flink's internal view is fresher) */
+	private transient long[] commitedOffsets;
+	
+	/** The offsets to restore to, if the consumer restores state from a checkpoint */
+	private transient long[] restoreToOffset;
+	
+	private volatile boolean running = true;
+	
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
+	 * 
+	 * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
+	 * at the beginnign of this class.</p>
+	 * 
+	 * @param topic 
+	 *           The Kafka topic to read from.
+	 * @param valueDeserializer
+	 *           The deserializer to turn raw byte messages into Java/Scala objects.
+	 * @param props
+	 *           The properties that are used to configure both the fetcher and the offset handler.
+	 * @param offsetStore
+	 *           The type of offset store to use (Kafka / ZooKeeper)
+	 * @param fetcherType
+	 *           The type of fetcher to use (new high-level API, old low-level API).
+	 */
+	public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props, 
+								OffsetStore offsetStore, FetcherType fetcherType) {
+		this.offsetStore = checkNotNull(offsetStore);
+		this.fetcherType = checkNotNull(fetcherType);
+
+		if(fetcherType == FetcherType.NEW_HIGH_LEVEL) {
+			throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 is not yet " +
+					"supported in Flink");
+		}
+		if (offsetStore == OffsetStore.KAFKA && fetcherType == FetcherType.LEGACY_LOW_LEVEL) {
+			throw new IllegalArgumentException(
+					"The Kafka offset handler cannot be used together with the old low-level fetcher.");
+		}
+		
+		this.topic = checkNotNull(topic, "topic");
+		this.props = checkNotNull(props, "props");
+		this.valueDeserializer = checkNotNull(valueDeserializer, "valueDeserializer");
+
+		// validate the zookeeper properties
+		if (offsetStore == OffsetStore.FLINK_ZOOKEEPER) {
+			validateZooKeeperConfig(props);
+		}
+		
+		// Connect to a broker to get the partitions
+		List<PartitionInfo> partitionInfos = getPartitionsForTopic(topic, props);
+
+		// get initial partitions list. The order of the partitions is important for consistent 
+		// partition id assignment in restart cases.
+		this.partitions = new int[partitionInfos.size()];
+		for (int i = 0; i < partitionInfos.size(); i++) {
+			partitions[i] = partitionInfos.get(i).partition();
+			
+			if (partitions[i] >= partitions.length) {
+				throw new RuntimeException("Kafka partition numbers are sparse");
+			}
+		}
+		LOG.info("Topic {} has {} partitions", topic, partitions.length);
+
+		// make sure that we take care of the committing
+		props.setProperty("enable.auto.commit", "false");
+	}
+
+	// ------------------------------------------------------------------------
+	//  Source life cycle
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		
+		final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
+		final int thisComsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
+		
+		// pick which partitions we work on
+		subscribedPartitions = assignPartitions(this.partitions, this.topic, numConsumers, thisComsumerIndex);
+		
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Kafka consumer {} will read partitions {} out of partitions {}",
+					thisComsumerIndex, subscribedPartitions, Arrays.toString(partitions));
+		}
+
+		// we leave the fetcher as null, if we have no partitions
+		if (subscribedPartitions.isEmpty()) {
+			LOG.info("Kafka consumer {} has no partitions (empty source)", thisComsumerIndex);
+			return;
+		}
+		
+		// create fetcher
+		switch (fetcherType){
+			case NEW_HIGH_LEVEL:
+				throw new UnsupportedOperationException("Currently unsupported");
+			case LEGACY_LOW_LEVEL:
+				fetcher = new LegacyFetcher(topic, props, getRuntimeContext().getTaskName());
+				break;
+			default:
+				throw new RuntimeException("Requested unknown fetcher " + fetcher);
+		}
+		fetcher.setPartitionsToRead(subscribedPartitions);
+
+		// offset handling
+		switch (offsetStore){
+			case FLINK_ZOOKEEPER:
+				offsetHandler = new ZookeeperOffsetHandler(props);
+				break;
+			case KAFKA:
+				throw new Exception("Kafka offset handler cannot work with legacy fetcher");
+			default:
+				throw new RuntimeException("Requested unknown offset store " + offsetStore);
+		}
+		
+		// set up operator state
+		lastOffsets = new long[partitions.length];
+		commitedOffsets = new long[partitions.length];
+		
+		Arrays.fill(lastOffsets, OFFSET_NOT_SET);
+		Arrays.fill(commitedOffsets, OFFSET_NOT_SET);
+		
+		// seek to last known pos, from restore request
+		if (restoreToOffset != null) {
+			if (LOG.isInfoEnabled()) {
+				LOG.info("Consumer {} found offsets from previous checkpoint: {}",
+						thisComsumerIndex,  Arrays.toString(restoreToOffset));
+			}
+			
+			for (int i = 0; i < restoreToOffset.length; i++) {
+				long restoredOffset = restoreToOffset[i];
+				if (restoredOffset != OFFSET_NOT_SET) {
+					// if this fails because we are not subscribed to the topic, then the
+					// partition assignment is not deterministic!
+					
+					// we set the offset +1 here, because seek() is accepting the next offset to read,
+					// but the restore offset is the last read offset
+					fetcher.seek(new TopicPartition(topic, i), restoredOffset + 1);
+					lastOffsets[i] = restoredOffset;
+				}
+			}
+		}
+		else {
+			// no restore request. Let the offset handler take care of the initial offset seeking
+			offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
+		}
+	}
+
+	@Override
+	public void run(SourceContext<T> sourceContext) throws Exception {
+		if (fetcher != null) {
+			fetcher.run(sourceContext, valueDeserializer, lastOffsets);
+		}
+		else {
+			// this source never completes
+			final Object waitLock = new Object();
+			while (running) {
+				// wait until we are canceled
+				try {
+					//noinspection SynchronizationOnLocalVariableOrMethodParameter
+					synchronized (waitLock) {
+						waitLock.wait();
+					}
+				}
+				catch (InterruptedException e) {
+					// do nothing, check our "running" status
+				}
+			}
+		}
+	}
+
+	@Override
+	public void cancel() {
+		// set ourselves as not running
+		running = false;
+		
+		// close the fetcher to interrupt any work
+		Fetcher fetcher = this.fetcher;
+		this.fetcher = null;
+		if (fetcher != null) {
+			try {
+				fetcher.close();
+			}
+			catch (IOException e) {
+				LOG.warn("Error while closing Kafka connector data fetcher", e);
+			}
+		}
+		
+		OffsetHandler offsetHandler = this.offsetHandler;
+		this.offsetHandler = null;
+		if (offsetHandler != null) {
+			try {
+				offsetHandler.close();
+			}
+			catch (IOException e) {
+				LOG.warn("Error while closing Kafka connector offset handler", e);
+			}
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		cancel();
+		super.close();
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return valueDeserializer.getProducedType();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Checkpoint and restore
+	// ------------------------------------------------------------------------
+
+	@Override
+	public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		if (lastOffsets == null) {
+			LOG.debug("snapshotState() requested on not yet opened source; returning null.");
+			return null;
+		}
+		if (!running) {
+			LOG.debug("snapshotState() called on closed source");
+			return null;
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}",
+					Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
+		}
+
+		long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
+
+		// the map cannot be asynchronously updated, because only one checkpoint call can happen
+		// on this function at a time: either snapshotState() or notifyCheckpointComplete()
+		pendingCheckpoints.put(checkpointId, currentOffsets);
+			
+		while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+			pendingCheckpoints.remove(0);
+		}
+
+		return currentOffsets;
+	}
+
+	@Override
+	public void restoreState(long[] restoredOffsets) {
+		restoreToOffset = restoredOffsets;
+	}
+
+	@Override
+	public void commitCheckpoint(long checkpointId) {
+		if (fetcher == null) {
+			LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+			return;
+		}
+		if (!running) {
+			LOG.debug("notifyCheckpointComplete() called on closed source");
+			return;
+		}
+		
+		// only one commit operation must be in progress
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Committing offsets externally for checkpoint {}", checkpointId);
+		}
+
+		long[] checkpointOffsets;
+
+		// the map may be asynchronously updates when snapshotting state, so we synchronize
+		synchronized (pendingCheckpoints) {
+			final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+			if (posInMap == -1) {
+				LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+				return;
+			}
+
+			checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
+			
+			// remove older checkpoints in map
+			for (int i = 0; i < posInMap; i++) {
+				pendingCheckpoints.remove(0);
+			}
+		}
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore);
+		}
+
+		// build the map of (topic,partition) -> committed offset
+		Map<TopicPartition, Long> offsetsToCommit = new HashMap<TopicPartition, Long>();
+		for (TopicPartition tp : subscribedPartitions) {
+			
+			int partition = tp.partition();
+			long offset = checkpointOffsets[partition];
+			long lastCommitted = commitedOffsets[partition];
+			
+			if (offset != OFFSET_NOT_SET) {
+				if (offset > lastCommitted) {
+					offsetsToCommit.put(tp, offset);
+					LOG.debug("Committing offset {} for partition {}", offset, partition);
+				}
+				else {
+					LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
+				}
+			}
+		}
+		try {
+			offsetHandler.commit(offsetsToCommit);
+		} catch(Exception e) {
+			throw new RuntimeException("Unable to commit offset", e);
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Miscellaneous utilities 
+	// ------------------------------------------------------------------------
+
+	protected static List<TopicPartition> assignPartitions(int[] partitions, String topicName,
+															int numConsumers, int consumerIndex) {
+		checkArgument(numConsumers > 0);
+		checkArgument(consumerIndex < numConsumers);
+		
+		List<TopicPartition> partitionsToSub = new ArrayList<TopicPartition>();
+
+		for (int i = 0; i < partitions.length; i++) {
+			if (i % numConsumers == consumerIndex) {
+				partitionsToSub.add(new TopicPartition(topicName, partitions[i]));
+			}
+		}
+		return partitionsToSub;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Kafka / ZooKeeper communication utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Send request to Kafka to get partitions for topic.
+	 * 
+	 * @param topic The name of the topic.
+	 * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. 
+	 */
+	public static List<PartitionInfo> getPartitionsForTopic(final String topic, final Properties properties) {
+		String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+		final int numRetries = Integer.valueOf(properties.getProperty(GET_PARTITIONS_RETRIES_KEY, Integer.toString(DEFAULT_GET_PARTITIONS_RETRIES)));
+
+		checkNotNull(seedBrokersConfString, "Configuration property " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set");
+		String[] seedBrokers = seedBrokersConfString.split(",");
+		List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
+
+		Random rnd = new Random();
+		retryLoop: for(int retry = 0; retry < numRetries; retry++) {
+			// we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
+			// parallel source instances start. Still, we try all available brokers.
+			int index = rnd.nextInt(seedBrokers.length);
+			brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) {
+				String seedBroker = seedBrokers[index];
+				LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries);
+				if (++index == seedBrokers.length) {
+					index = 0;
+				}
+
+				URL brokerUrl = NetUtils.getHostnamePort(seedBroker);
+				SimpleConsumer consumer = null;
+				try {
+					final String clientId = "flink-kafka-consumer-partition-lookup";
+					final int soTimeout = Integer.valueOf(properties.getProperty("socket.timeout.ms", "30000"));
+					final int bufferSize = Integer.valueOf(properties.getProperty("socket.receive.buffer.bytes", "65536"));
+					consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId);
+
+					List<String> topics = Collections.singletonList(topic);
+					TopicMetadataRequest req = new TopicMetadataRequest(topics);
+					kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+					List<TopicMetadata> metaData = resp.topicsMetadata();
+
+					// clear in case we have an incomplete list from previous tries
+					partitions.clear();
+					for (TopicMetadata item : metaData) {
+						if (item.errorCode() != ErrorMapping.NoError()) {
+							if (item.errorCode() == ErrorMapping.InvalidTopicCode() || item.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) {
+								// fail hard if topic is unknown
+								throw new RuntimeException("Requested partitions for unknown topic", ErrorMapping.exceptionFor(item.errorCode()));
+							}
+							// warn and try more brokers
+							LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions for " + topic,
+									ErrorMapping.exceptionFor(item.errorCode()));
+							continue brokersLoop;
+						}
+						if (!item.topic().equals(topic)) {
+							LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
+							continue brokersLoop;
+						}
+						for (PartitionMetadata part : item.partitionsMetadata()) {
+							Node leader = brokerToNode(part.leader());
+							Node[] replicas = new Node[part.replicas().size()];
+							for (int i = 0; i < part.replicas().size(); i++) {
+								replicas[i] = brokerToNode(part.replicas().get(i));
+							}
+
+							Node[] ISRs = new Node[part.isr().size()];
+							for (int i = 0; i < part.isr().size(); i++) {
+								ISRs[i] = brokerToNode(part.isr().get(i));
+							}
+							PartitionInfo pInfo = new PartitionInfo(topic, part.partitionId(), leader, replicas, ISRs);
+							partitions.add(pInfo);
+						}
+					}
+					break retryLoop; // leave the loop through the brokers
+				} catch (Exception e) {
+					LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topic, e);
+				} finally {
+					if (consumer != null) {
+						consumer.close();
+					}
+				}
+			} // brokers loop
+		} // retries loop
+		return partitions;
+	}
+
+	private static Node brokerToNode(Broker broker) {
+		return new Node(broker.id(), broker.host(), broker.port());
+	}
+	
+	protected static void validateZooKeeperConfig(Properties props) {
+		if (props.getProperty("zookeeper.connect") == null) {
+			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
+		}
+		if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+			throw new IllegalArgumentException("Required property '" + ConsumerConfig.GROUP_ID_CONFIG
+					+ "' has not been set in the properties");
+		}
+		
+		try {
+			//noinspection ResultOfMethodCallIgnored
+			Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
+		}
+		catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
+		}
+		
+		try {
+			//noinspection ResultOfMethodCallIgnored
+			Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
+		}
+		catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
new file mode 100644
index 0000000..21f24e6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Creates a Kafka consumer compatible with reading from Kafka 0.8.1.x brokers.
+ * The consumer will internally use the old low-level Kafka API, and manually commit offsets
+ * partition offsets to ZooKeeper.
+ * 
+ * <p>The following additional configuration values are available:</p>
+ * <ul>
+ *   <li>socket.timeout.ms</li>
+ *   <li>socket.receive.buffer.bytes</li>
+ *   <li>fetch.message.max.bytes</li>
+ *   <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li>
+ *   <li>fetch.wait.max.ms</li>
+ * </ul>
+ * 
+ * @param <T> The type of elements produced by this consumer.
+ */
+public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer<T> {
+
+	private static final long serialVersionUID = -5649906773771949146L;
+
+	/**
+	 * Creates a new Kafka 0.8.1.x streaming source consumer.
+	 *
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects. 
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+		super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
new file mode 100644
index 0000000..77e41e5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Creates a Kafka consumer compatible with reading from Kafka 0.8.2.x brokers.
+ * The consumer will internally use the old low-level Kafka API, and manually commit offsets
+ * partition offsets to ZooKeeper.
+ *
+ * Once Kafka released the new consumer with Kafka 0.8.3 Flink might use the 0.8.3 consumer API
+ * also against Kafka 0.8.2 installations.
+ *
+ * @param <T> The type of elements produced by this consumer.
+ */
+public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
+
+	private static final long serialVersionUID = -8450689820627198228L;
+
+	/**
+	 * Creates a new Kafka 0.8.2.x streaming source consumer.
+	 * 
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects. 
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+		super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
new file mode 100644
index 0000000..ebda603
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.google.common.base.Preconditions;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.DefaultEncoder;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.PartitionerWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Sink that emits its inputs to a Kafka topic.
+ *
+ * @param <IN>
+ * 		Type of the sink input
+ */
+public class KafkaSink<IN> extends RichSinkFunction<IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
+
+	private Producer<IN, byte[]> producer;
+	private Properties userDefinedProperties;
+	private String topicId;
+	private String brokerList;
+	private SerializationSchema<IN, byte[]> schema;
+	private SerializableKafkaPartitioner partitioner;
+	private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
+
+	/**
+	 * Creates a KafkaSink for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Addresses of the brokers
+	 * @param topicId
+	 * 		ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 		User defined serialization schema.
+	 */
+	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
+		this(brokerList, topicId, new Properties(), serializationSchema);
+	}
+
+	/**
+	 * Creates a KafkaSink for a given topic with custom Producer configuration.
+	 * If you use this constructor, the broker should be set with the "metadata.broker.list"
+	 * configuration.
+	 *
+	 * @param brokerList
+	 * 		Addresses of the brokers
+	 * @param topicId
+	 * 		ID of the Kafka topic.
+	 * @param producerConfig
+	 * 		Configurations of the Kafka producer
+	 * @param serializationSchema
+	 * 		User defined serialization schema.
+	 */
+	public KafkaSink(String brokerList, String topicId, Properties producerConfig, SerializationSchema<IN, byte[]> serializationSchema) {
+		String[] elements = brokerList.split(",");
+		for(String broker: elements) {
+			NetUtils.getHostnamePort(broker);
+		}
+		Preconditions.checkNotNull(topicId, "TopicID not set");
+
+		this.brokerList = brokerList;
+		this.topicId = topicId;
+		this.schema = serializationSchema;
+		this.partitionerClass = null;
+		this.userDefinedProperties = producerConfig;
+	}
+
+	/**
+	 * Creates a KafkaSink for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 * @param topicId
+	 * 		ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 		User defined serialization schema.
+	 * @param partitioner
+	 * 		User defined partitioner.
+	 */
+	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
+		this(brokerList, topicId, serializationSchema);
+		ClosureCleaner.ensureSerializable(partitioner);
+		this.partitioner = partitioner;
+	}
+
+	public KafkaSink(String brokerList,
+					String topicId,
+					SerializationSchema<IN, byte[]> serializationSchema,
+					Class<? extends SerializableKafkaPartitioner> partitioner) {
+		this(brokerList, topicId, serializationSchema);
+		this.partitionerClass = partitioner;
+	}
+
+	/**
+	 * Initializes the connection to Kafka.
+	 */
+	@Override
+	public void open(Configuration configuration) {
+
+		Properties properties = new Properties();
+
+		properties.put("metadata.broker.list", brokerList);
+		properties.put("request.required.acks", "-1");
+		properties.put("message.send.max.retries", "10");
+
+		properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
+
+		// this will not be used as the key will not be serialized
+		properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
+
+		for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
+			properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
+		}
+
+		if (partitioner != null) {
+			properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
+			// java serialization will do the rest.
+			properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
+		}
+		if (partitionerClass != null) {
+			properties.put("partitioner.class", partitionerClass);
+		}
+
+		ProducerConfig config = new ProducerConfig(properties);
+
+		try {
+			producer = new Producer<IN, byte[]>(config);
+		} catch (NullPointerException e) {
+			throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
+		}
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Kafka.
+	 *
+	 * @param next
+	 * 		The incoming data
+	 */
+	@Override
+	public void invoke(IN next) {
+		byte[] serialized = schema.serialize(next);
+
+		// Sending message without serializable key.
+		producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
+	}
+
+	@Override
+	public void close() {
+		if (producer != null) {
+			producer.close();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
new file mode 100644
index 0000000..7b9f991
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.producer.Partitioner;
+
+import java.io.Serializable;
+
+public interface SerializableKafkaPartitioner extends Serializable, Partitioner {
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index ead24f3..c8400a5 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -22,11 +22,11 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
 /**
  * Sink that emits its inputs to a Kafka topic.
  *
- * The KafkaSink has been relocated to org.apache.flink.streaming.connectors.KafkaSink.
+ * The KafkaSink has been relocated to org.apache.flink.streaming.connectors.kafka.KafkaSink.
  * This class will be removed in future releases of Flink.
  */
 @Deprecated
-public class KafkaSink<IN> extends org.apache.flink.streaming.connectors.KafkaSink<IN> {
+public class KafkaSink<IN> extends org.apache.flink.streaming.connectors.kafka.KafkaSink<IN> {
 	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
 		super(brokerList, topicId, serializationSchema);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 4181134..869c44f 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka.api.persistent;
 
 import kafka.consumer.ConsumerConfig;
-import org.apache.flink.streaming.connectors.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
new file mode 100644
index 0000000..4345926
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A fetcher pulls data from Kafka, from a fix set of partitions.
+ * The fetcher supports "seeking" inside the partitions, i.e., moving to a different offset.
+ */
+public interface Fetcher {
+
+	/**
+	 * Set which partitions the fetcher should pull from.
+	 * 
+	 * @param partitions The list of partitions for a topic that the fetcher will pull from.
+	 */
+	void setPartitionsToRead(List<TopicPartition> partitions);
+
+	/**
+	 * Closes the fetcher. This will stop any operation in the
+	 * {@link #run(SourceFunction.SourceContext, DeserializationSchema, long[])} method and eventually
+	 * close underlying connections and release all resources.
+	 */
+	void close() throws IOException;
+
+	/**
+	 * Starts fetch data from Kafka and emitting it into the stream.
+	 * 
+	 * <p>To provide exactly once guarantees, the fetcher needs emit a record and update the update
+	 * of the last consumed offset in one atomic operation:</p>
+	 * <pre>{@code
+	 * 
+	 * while (running) {
+	 *     T next = ...
+	 *     long offset = ...
+	 *     int partition = ...
+	 *     synchronized (sourceContext.getCheckpointLock()) {
+	 *         sourceContext.collect(next);
+	 *         lastOffsets[partition] = offset;
+	 *     }
+	 * }
+	 * }</pre>
+	 * 
+	 * @param sourceContext The source context to emit elements to.
+	 * @param valueDeserializer The deserializer to decode the raw values with.
+	 * @param lastOffsets The array into which to store the offsets foe which elements are emitted. 
+	 * 
+	 * @param <T> The type of elements produced by the fetcher and emitted to the source context.
+	 */
+	<T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, 
+					long[] lastOffsets) throws Exception;
+	
+	/**
+	 * Set the next offset to read from for the given partition.
+	 * For example, if the partition <i>i</i> offset is set to <i>n</i>, the Fetcher's next result
+	 * will be the message with <i>offset=n</i>.
+	 * 
+	 * @param topicPartition The partition for which to seek the offset.
+	 * @param offsetToRead To offset to seek to.
+	 */
+	void seek(TopicPartition topicPartition, long offsetToRead);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
new file mode 100644
index 0000000..328a96f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.api.FetchRequestBuilder;
+import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * This fetcher uses Kafka's low-level API to pull data from a specific
+ * set of partitions and offsets for a certain topic.
+ * 
+ * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p>
+ */
+public class LegacyFetcher implements Fetcher {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
+
+	/** The topic from which this fetcher pulls data */
+	private final String topic;
+	
+	/** The properties that configure the Kafka connection */
+	private final Properties config;
+	
+	/** The task name, to give more readable names to the spawned threads */
+	private final String taskName;
+	
+	/** The first error that occurred in a connection thread */
+	private final AtomicReference<Throwable> error;
+
+	/** The partitions that the fetcher should read, with their starting offsets */
+	private Map<TopicPartition, Long> partitionsToRead;
+	
+	/** Reference the the thread that executed the run() method. */
+	private volatile Thread mainThread;
+	
+	/** Flag to shot the fetcher down */
+	private volatile boolean running = true;
+
+	public LegacyFetcher(String topic, Properties props, String taskName) {
+		this.config = checkNotNull(props, "The config properties cannot be null");
+		this.topic = checkNotNull(topic, "The topic cannot be null");
+		this.taskName = taskName;
+		this.error = new AtomicReference<Throwable>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Fetcher methods
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void setPartitionsToRead(List<TopicPartition> partitions) {
+		partitionsToRead = new HashMap<TopicPartition, Long>(partitions.size());
+		for (TopicPartition tp: partitions) {
+			partitionsToRead.put(tp, FlinkKafkaConsumer.OFFSET_NOT_SET);
+		}
+	}
+
+	@Override
+	public void seek(TopicPartition topicPartition, long offsetToRead) {
+		if (partitionsToRead == null) {
+			throw new IllegalArgumentException("No partitions to read set");
+		}
+		if (!partitionsToRead.containsKey(topicPartition)) {
+			throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition
+					+ ") we are not going to read. Partitions to read " + partitionsToRead);
+		}
+		partitionsToRead.put(topicPartition, offsetToRead);
+	}
+	
+	@Override
+	public void close() {
+		// flag needs to be check by the run() method that creates the spawned threads
+		this.running = false;
+		
+		// all other cleanup is made by the run method itself
+	}
+
+	@Override
+	public <T> void run(SourceFunction.SourceContext<T> sourceContext, 
+						DeserializationSchema<T> valueDeserializer,
+						long[] lastOffsets) throws Exception {
+		
+		if (partitionsToRead == null || partitionsToRead.size() == 0) {
+			throw new IllegalArgumentException("No partitions set");
+		}
+		
+		// NOTE: This method is needs to always release all resources it acquires
+		
+		this.mainThread = Thread.currentThread();
+
+		LOG.info("Reading from partitions " + partitionsToRead + " using the legacy fetcher");
+		
+		// get lead broker for each partition
+		
+		// NOTE: The kafka client apparently locks itself in an infinite loop sometimes
+		// when it is interrupted, so we run it only in a separate thread.
+		// since it sometimes refuses to shut down, we resort to the admittedly harsh
+		// means of killing the thread after a timeout.
+		PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(topic, config);
+		infoFetcher.start();
+		
+		KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
+		watchDog.start();
+		
+		final List<PartitionInfo> allPartitionsInTopic = infoFetcher.getPartitions();
+		
+		// brokers to fetch partitions from.
+		int fetchPartitionsCount = 0;
+		Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<Node, List<FetchPartition>>();
+		
+		for (PartitionInfo partitionInfo : allPartitionsInTopic) {
+			if (partitionInfo.leader() == null) {
+				throw new RuntimeException("Unable to consume partition " + partitionInfo.partition()
+						+ " from topic "+partitionInfo.topic()+" because it does not have a leader");
+			}
+			
+			for (Map.Entry<TopicPartition, Long> entry : partitionsToRead.entrySet()) {
+				final TopicPartition topicPartition = entry.getKey();
+				final long offset = entry.getValue();
+				
+				// check if that partition is for us
+				if (topicPartition.partition() == partitionInfo.partition()) {
+					List<FetchPartition> partitions = fetchBrokers.get(partitionInfo.leader());
+					if (partitions == null) {
+						partitions = new ArrayList<FetchPartition>();
+						fetchBrokers.put(partitionInfo.leader(), partitions);
+					}
+					
+					partitions.add(new FetchPartition(topicPartition.partition(), offset));
+					fetchPartitionsCount++;
+					
+				}
+				// else this partition is not for us
+			}
+		}
+		
+		if (partitionsToRead.size() != fetchPartitionsCount) {
+			throw new RuntimeException(partitionsToRead.size() + " partitions to read, but got only "
+					+ fetchPartitionsCount + " partition infos with lead brokers.");
+		}
+
+		// create SimpleConsumers for each broker
+		ArrayList<SimpleConsumerThread<?>> consumers = new ArrayList<SimpleConsumerThread<?>>(fetchBrokers.size());
+		
+		for (Map.Entry<Node, List<FetchPartition>> brokerInfo : fetchBrokers.entrySet()) {
+			final Node broker = brokerInfo.getKey();
+			final List<FetchPartition> partitionsList = brokerInfo.getValue();
+			
+			FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
+
+			SimpleConsumerThread<T> thread = new SimpleConsumerThread<T>(this, config, topic,
+					broker, partitions, sourceContext, valueDeserializer, lastOffsets);
+
+			thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
+					taskName, broker.id(), broker.host(), broker.port()));
+			thread.setDaemon(true);
+			consumers.add(thread);
+		}
+		
+		// last check whether we should abort.
+		if (!running) {
+			return;
+		}
+		
+		// start all consumer threads
+		for (SimpleConsumerThread<?> t : consumers) {
+			LOG.info("Starting thread {}", t.getName());
+			t.start();
+		}
+		
+		// wait until all consumer threads are done, or until we are aborted, or until
+		// an error occurred in one of the fetcher threads
+		try {
+			boolean someConsumersRunning = true;
+			while (running && error.get() == null && someConsumersRunning) {
+				try {
+					// wait for the consumer threads. if an error occurs, we are interrupted
+					for (SimpleConsumerThread<?> t : consumers) {
+						t.join();
+					}
+	
+					// safety net
+					someConsumersRunning = false;
+					for (SimpleConsumerThread<?> t : consumers) {
+						someConsumersRunning |= t.isAlive();
+					}
+				}
+				catch (InterruptedException e) {
+					// ignore. we should notice what happened in the next loop check
+				}
+			}
+			
+			// make sure any asynchronous error is noticed
+			Throwable error = this.error.get();
+			if (error != null) {
+				throw new Exception(error.getMessage(), error);
+			}
+		}
+		finally {
+			// make sure that in any case (completion, abort, error), all spawned threads are stopped
+			for (SimpleConsumerThread<?> t : consumers) {
+				if (t.isAlive()) {
+					t.cancel();
+				}
+			}
+		}
+	}
+	
+	/**
+	 * Reports an error from a fetch thread. This will cause the main thread to see this error,
+	 * abort, and cancel all other fetch threads.
+	 * 
+	 * @param error The error to report.
+	 */
+	void onErrorInFetchThread(Throwable error) {
+		if (this.error.compareAndSet(null, error)) {
+			// we are the first to report an error
+			if (mainThread != null) {
+				mainThread.interrupt();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Representation of a partition to fetch.
+	 */
+	private static class FetchPartition {
+		
+		/** ID of the partition within the topic (0 indexed, as given by Kafka) */
+		int partition;
+		
+		/** Offset pointing at the next element to read from that partition. */
+		long nextOffsetToRead;
+
+		FetchPartition(int partition, long nextOffsetToRead) {
+			this.partition = partition;
+			this.nextOffsetToRead = nextOffsetToRead;
+		}
+		
+		@Override
+		public String toString() {
+			return "FetchPartition {partition=" + partition + ", offset=" + nextOffsetToRead + '}';
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Per broker fetcher
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Each broker needs its separate connection. This thread implements the connection to
+	 * one broker. The connection can fetch multiple partitions from the broker.
+	 * 
+	 * @param <T> The data type fetched.
+	 */
+	private static class SimpleConsumerThread<T> extends Thread {
+		
+		private final SourceFunction.SourceContext<T> sourceContext;
+		private final DeserializationSchema<T> valueDeserializer;
+		private final long[] offsetsState;
+		
+		private final FetchPartition[] partitions;
+		
+		private final Node broker;
+		private final String topic;
+		private final Properties config;
+
+		private final LegacyFetcher owner;
+
+		private SimpleConsumer consumer;
+		
+		private volatile boolean running = true;
+
+
+		// exceptions are thrown locally
+		public SimpleConsumerThread(LegacyFetcher owner,
+									Properties config, String topic,
+									Node broker,
+									FetchPartition[] partitions,
+									SourceFunction.SourceContext<T> sourceContext,
+									DeserializationSchema<T> valueDeserializer,
+									long[] offsetsState) {
+			this.owner = owner;
+			this.config = config;
+			this.topic = topic;
+			this.broker = broker;
+			this.partitions = partitions;
+			this.sourceContext = checkNotNull(sourceContext);
+			this.valueDeserializer = checkNotNull(valueDeserializer);
+			this.offsetsState = checkNotNull(offsetsState);
+		}
+
+		@Override
+		public void run() {
+			try {
+				// set up the config values
+				final String clientId = "flink-kafka-consumer-legacy-" + broker.id();
+
+				// these are the actual configuration values of Kafka + their original default values.
+				final int soTimeout = Integer.valueOf(config.getProperty("socket.timeout.ms", "30000"));
+				final int bufferSize = Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536"));
+				final int fetchSize = Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576"));
+				final int maxWait = Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100"));
+				final int minBytes = Integer.valueOf(config.getProperty("fetch.min.bytes", "1"));
+				
+				// create the Kafka consumer that we actually use for fetching
+				consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
+
+				// make sure that all partitions have some offsets to start with
+				// those partitions that do not have an offset from a checkpoint need to get
+				// their start offset from ZooKeeper
+				
+				List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<FetchPartition>();
+
+				for (FetchPartition fp : partitions) {
+					if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
+						// retrieve the offset from the consumer
+						partitionsToGetOffsetsFor.add(fp);
+					}
+				}
+				if (partitionsToGetOffsetsFor.size() > 0) {
+					long timeType;
+					if (config.getProperty("auto.offset.reset", "latest").equals("latest")) {
+						timeType = OffsetRequest.LatestTime();
+					} else {
+						timeType = OffsetRequest.EarliestTime();
+					}
+					getLastOffset(consumer, topic, partitionsToGetOffsetsFor, timeType);
+					LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
+							topic, partitionsToGetOffsetsFor);
+				}
+				
+				// Now, the actual work starts :-)
+				
+				while (running) {
+					FetchRequestBuilder frb = new FetchRequestBuilder();
+					frb.clientId(clientId);
+					frb.maxWait(maxWait);
+					frb.minBytes(minBytes);
+					
+					for (FetchPartition fp : partitions) {
+						frb.addFetch(topic, fp.partition, fp.nextOffsetToRead, fetchSize);
+					}
+					kafka.api.FetchRequest fetchRequest = frb.build();
+					LOG.debug("Issuing fetch request {}", fetchRequest);
+
+					FetchResponse fetchResponse;
+					fetchResponse = consumer.fetch(fetchRequest);
+
+					if (fetchResponse.hasError()) {
+						String exception = "";
+						for (FetchPartition fp : partitions) {
+							short code;
+							if ((code = fetchResponse.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
+								exception += "\nException for partition " + fp.partition + ": " + 
+										StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+							}
+						}
+						throw new IOException("Error while fetching from broker: " + exception);
+					}
+
+					int messagesInFetch = 0;
+					for (FetchPartition fp : partitions) {
+						final ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, fp.partition);
+						final int partition = fp.partition;
+						
+						for (MessageAndOffset msg : messageSet) {
+							if (running) {
+								messagesInFetch++;
+								if (msg.offset() < fp.nextOffsetToRead) {
+									// we have seen this message already
+									LOG.info("Skipping message with offset " + msg.offset()
+											+ " because we have seen messages until " + fp.nextOffsetToRead
+											+ " from partition " + fp.partition + " already");
+									continue;
+								}
+								
+								ByteBuffer payload = msg.message().payload();
+								byte[] valueByte = new byte[payload.remaining()];
+								payload.get(valueByte);
+								
+								final T value = valueDeserializer.deserialize(valueByte);
+								final long offset = msg.offset();
+										
+								synchronized (sourceContext.getCheckpointLock()) {
+									sourceContext.collect(value);
+									offsetsState[partition] = offset;
+								}
+								
+								// advance offset for the next request
+								fp.nextOffsetToRead = offset + 1;
+							}
+							else {
+								// no longer running
+								return;
+							}
+						}
+					}
+					LOG.debug("This fetch contained {} messages", messagesInFetch);
+				}
+			}
+			catch (Throwable t) {
+				// report to the main thread
+				owner.onErrorInFetchThread(t);
+			}
+			finally {
+				// end of run loop. close connection to consumer
+				if (consumer != null) {
+					// closing the consumer should not fail the program
+					try {
+						consumer.close();
+					}
+					catch (Throwable t) {
+						LOG.error("Error while closing the Kafka simple consumer", t);
+					}
+				}
+			}
+		}
+
+		/**
+		 * Cancels this fetch thread. The thread will release all resources and terminate.
+		 */
+		public void cancel() {
+			this.running = false;
+			
+			// interrupt whatever the consumer is doing
+			if (consumer != null) {
+				consumer.close();
+			}
+			
+			this.interrupt();
+		}
+
+		/**
+		 * Request latest offsets for a set of partitions, via a Kafka consumer.
+		 *
+		 * @param consumer The consumer connected to lead broker
+		 * @param topic The topic name
+		 * @param partitions The list of partitions we need offsets for
+		 * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
+		 */
+		private static void getLastOffset(SimpleConsumer consumer, String topic, List<FetchPartition> partitions, long whichTime) {
+
+			Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+			for (FetchPartition fp: partitions) {
+				TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition);
+				requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+			}
+
+			kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+			OffsetResponse response = consumer.getOffsetsBefore(request);
+
+			if (response.hasError()) {
+				String exception = "";
+				for (FetchPartition fp: partitions) {
+					short code;
+					if ( (code=response.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
+						exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+					}
+				}
+				throw new RuntimeException("Unable to get last offset for topic " + topic + " and partitions " + partitions
+						+ ". " + exception);
+			}
+
+			for (FetchPartition fp: partitions) {
+				// the resulting offset is the next offset we are going to read
+				// for not-yet-consumed partitions, it is 0.
+				fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
+			}
+		}
+	}
+	
+	private static class PartitionInfoFetcher extends Thread {
+
+		private final String topic;
+		private final Properties properties;
+		
+		private volatile List<PartitionInfo> result;
+		private volatile Throwable error;
+
+		
+		PartitionInfoFetcher(String topic, Properties properties) {
+			this.topic = topic;
+			this.properties = properties;
+		}
+
+		@Override
+		public void run() {
+			try {
+				result = FlinkKafkaConsumer.getPartitionsForTopic(topic, properties);
+			}
+			catch (Throwable t) {
+				this.error = t;
+			}
+		}
+		
+		public List<PartitionInfo> getPartitions() throws Exception {
+			try {
+				this.join();
+			}
+			catch (InterruptedException e) {
+				throw new Exception("Partition fetching was cancelled before completion");
+			}
+			
+			if (error != null) {
+				throw new Exception("Failed to fetch partitions for topic " + topic, error);
+			}
+			if (result != null) {
+				return result;
+			}
+			throw new Exception("Partition fetching failed");
+		}
+	}
+
+	private static class KillerWatchDog extends Thread {
+		
+		private final Thread toKill;
+		private final long timeout;
+
+		private KillerWatchDog(Thread toKill, long timeout) {
+			super("KillerWatchDog");
+			setDaemon(true);
+			
+			this.toKill = toKill;
+			this.timeout = timeout;
+		}
+
+		@SuppressWarnings("deprecation")
+		@Override
+		public void run() {
+			final long deadline = System.currentTimeMillis() + timeout;
+			long now;
+			
+			while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
+				try {
+					toKill.join(deadline - now);
+				}
+				catch (InterruptedException e) {
+					// ignore here, our job is important!
+				}
+			}
+			
+			// this is harsh, but this watchdog is a last resort
+			if (toKill.isAlive()) {
+				toKill.stop();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
new file mode 100644
index 0000000..2a82561
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The offset handler is responsible for locating the initial partition offsets 
+ * where the source should start reading, as well as committing offsets from completed
+ * checkpoints.
+ */
+public interface OffsetHandler {
+
+	/**
+	 * Commits the given offset for the partitions. May commit the offsets to the Kafka broker,
+	 * or to ZooKeeper, based on its configured behavior.
+	 *
+	 * @param offsetsToCommit The offset to commit, per partition.
+	 */
+	void commit(Map<TopicPartition, Long> offsetsToCommit) throws Exception;
+
+	/**
+	 * Positions the given fetcher to the initial read offsets where the stream consumption
+	 * will start from.
+	 * 
+	 * @param partitions The partitions for which to seeks the fetcher to the beginning.
+	 * @param fetcher The fetcher that will pull data from Kafka and must be positioned.
+	 */
+	void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) throws Exception;
+
+	/**
+	 * Closes the offset handler, releasing all resources.
+	 * 
+	 * @throws IOException Thrown, if the closing fails.
+	 */
+	void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
new file mode 100644
index 0000000..a38c3bd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.producer.Partitioner;
+import kafka.utils.VerifiableProperties;
+
+/**
+ * Hacky wrapper to send an object instance through a Properties - map.
+ *
+ * This works as follows:
+ * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
+ *
+ * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
+ * This is set in the key-value (java.util.Properties) map.
+ * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
+ * This is a hack because the put() method is called on the underlying Hashmap.
+ *
+ * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
+ *
+ * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
+ */
+public class PartitionerWrapper implements Partitioner {
+	public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
+
+	private Partitioner wrapped;
+	public PartitionerWrapper(VerifiableProperties properties) {
+		wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
+	}
+
+	@Override
+	public int partition(Object value, int numberOfPartitions) {
+		return wrapped.partition(value, numberOfPartitions);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
new file mode 100644
index 0000000..001b6cb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * Simple ZooKeeper serializer for Strings.
+ */
+public class ZooKeeperStringSerializer implements ZkSerializer {
+
+	private static final Charset CHARSET = Charset.forName("UTF-8");
+	
+	@Override
+	public byte[] serialize(Object data) {
+		if (data instanceof String) {
+			return ((String) data).getBytes(CHARSET);
+		}
+		else {
+			throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
+		}
+	}
+
+	@Override
+	public Object deserialize(byte[] bytes) {
+		if (bytes == null) {
+			return null;
+		}
+		else {
+			return new String(bytes, CHARSET);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
new file mode 100644
index 0000000..42a5951
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.common.TopicAndPartition;
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class ZookeeperOffsetHandler implements OffsetHandler {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
+	
+	private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;
+	
+	
+	private final ZkClient zkClient;
+	
+	private final String groupId;
+
+	
+	public ZookeeperOffsetHandler(Properties props) {
+		this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+		
+		if (this.groupId == null) {
+			throw new IllegalArgumentException("Required property '"
+					+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
+		}
+		
+		String zkConnect = props.getProperty("zookeeper.connect");
+		if (zkConnect == null) {
+			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
+		}
+		
+		zkClient = new ZkClient(zkConnect,
+				Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
+				Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
+				new ZooKeeperStringSerializer());
+	}
+
+
+	@Override
+	public void commit(Map<TopicPartition, Long> offsetsToCommit) {
+		for (Map.Entry<TopicPartition, Long> entry : offsetsToCommit.entrySet()) {
+			TopicPartition tp = entry.getKey();
+			long offset = entry.getValue();
+			
+			if (offset >= 0) {
+				setOffsetInZooKeeper(zkClient, groupId, tp.topic(), tp.partition(), offset);
+			}
+		}
+	}
+
+	@Override
+	public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
+		for (TopicPartition tp : partitions) {
+			long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.topic(), tp.partition());
+
+			if (offset != OFFSET_NOT_SET) {
+				LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
+						tp.partition(), offset);
+
+				// the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
+				fetcher.seek(tp, offset + 1);
+			}
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		zkClient.close();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Communication with Zookeeper
+	// ------------------------------------------------------------------------
+	
+	public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
+		TopicAndPartition tap = new TopicAndPartition(topic, partition);
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+		ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
+	}
+
+	public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) {
+		TopicAndPartition tap = new TopicAndPartition(topic, partition);
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+
+		scala.Tuple2<Option<String>, Stat> data = ZkUtils.readDataMaybeNull(zkClient,
+				topicDirs.consumerOffsetDir() + "/" + tap.partition());
+
+		if (data._1().isEmpty()) {
+			return OFFSET_NOT_SET;
+		} else {
+			return Long.valueOf(data._1().get());
+		}
+	}
+}