You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:04:04 UTC

[48/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
deleted file mode 100644
index 8066b3c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ /dev/null
@@ -1,689 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import org.apache.flink.util.NetUtils;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
- * Apache Kafka. The consumer can run in multiple parallel instances, each of which will pull
- * data from one or more Kafka partitions. 
- * 
- * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
- * during a failure, and that the computation processes elements "exactly once". 
- * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
- * 
- * <p>To support a variety of Kafka brokers, protocol versions, and offset committing approaches,
- * the Flink Kafka Consumer can be parametrized with a <i>fetcher</i> and an <i>offset handler</i>.</p>
- *
- * <h1>Fetcher</h1>
- * 
- * <p>The fetcher is responsible to pull data from Kafka. Because Kafka has undergone a change in
- * protocols and APIs, there are currently two fetchers available:</p>
- * 
- * <ul>
- *     <li>{@link FetcherType#NEW_HIGH_LEVEL}: A fetcher based on the new Kafka consumer API.
- *         This fetcher is generally more robust, but works only with later versions of
- *         Kafka (> 0.8.2).</li>
- *         
- *     <li>{@link FetcherType#LEGACY_LOW_LEVEL}: A fetcher based on the old low-level consumer API.
- *         This fetcher is works also with older versions of Kafka (0.8.1). The fetcher interprets
- *         the old Kafka consumer properties, like:
- *         <ul>
- *             <li>socket.timeout.ms</li>
- *             <li>socket.receive.buffer.bytes</li>
- *             <li>fetch.message.max.bytes</li>
- *             <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li>
- *             <li>fetch.wait.max.ms</li>
- *         </ul>
- *     </li>
- * </ul>
- * 
- * <h1>Offset handler</h1>
- * 
- * <p>Offsets whose records have been read and are checkpointed will be committed back to Kafka / ZooKeeper
- * by the offset handler. In addition, the offset handler finds the point where the source initially
- * starts reading from the stream, when the streaming job is started.</p>
- * 
- * <p>Currently, the source offers two different offset handlers exist:</p>
- * <ul>
- *     <li>{@link OffsetStore#KAFKA}: Use this offset handler when the Kafka brokers are managing the offsets,
- *         and hence offsets need to be committed the Kafka brokers, rather than to ZooKeeper.
- *         Note that this offset handler works only on new versions of Kafka (0.8.2.x +) and
- *         with the {@link FetcherType#NEW_HIGH_LEVEL} fetcher.</li>
- *         
- *     <li>{@link OffsetStore#FLINK_ZOOKEEPER}: Use this offset handler when the offsets are managed
- *         by ZooKeeper, as in older versions of Kafka (0.8.1.x)</li>
- * </ul>
- * 
- * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
- * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
- * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
- * has consumed a topic.</p>
- * 
- * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
- * is constructed. That means that the client that submits the program needs to be able to
- * reach the Kafka brokers or ZooKeeper.</p>
- */
-public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
-		implements CheckpointNotifier, CheckpointedAsynchronously<long[]>, ResultTypeQueryable<T> {
-
-	/**
-	 * The offset store defines how acknowledged offsets are committed back to Kafka. Different
-	 * options include letting Flink periodically commit to ZooKeeper, or letting Kafka manage the
-	 * offsets (new Kafka versions only).
-	 */
-	public enum OffsetStore {
-
-		/**
-		 * Let Flink manage the offsets. Flink will periodically commit them to Zookeeper (usually after
-		 * successful checkpoints), in the same structure as Kafka 0.8.2.x
-		 * 
-		 * <p>Use this mode when using the source with Kafka 0.8.1.x brokers.</p>
-		 */
-		FLINK_ZOOKEEPER,
-
-		/**
-		 * Use the mechanisms in Kafka to commit offsets. Depending on the Kafka configuration, different
-		 * mechanism will be used (broker coordinator, zookeeper)
-		 */ 
-		KAFKA
-	}
-
-	/**
-	 * The fetcher type defines which code paths to use to pull data from teh Kafka broker.
-	 */
-	public enum FetcherType {
-
-		/**
-		 * The legacy fetcher uses Kafka's old low-level consumer API.
-		 * 
-		 * <p>Use this fetcher for Kafka 0.8.1 brokers.</p>
-		 */
-		LEGACY_LOW_LEVEL,
-
-		/**
-		 * This fetcher uses a backport of the new consumer API to pull data from the Kafka broker.
-		 * It is the fetcher that will be maintained in the future, and it already 
-		 * handles certain failure cases with less overhead than the legacy fetcher.
-		 * 
-		 * <p>This fetcher works only Kafka 0.8.2 and 0.8.3 (and future versions).</p>
-		 */
-		NEW_HIGH_LEVEL
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	private static final long serialVersionUID = -6272159445203409112L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
-
-	/** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
-	 * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
-	public static final long OFFSET_NOT_SET = -915623761776L;
-
-	/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
-	public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
-	/** Configuration key for the number of retries for getting the partition info */
-	public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
-
-	/** Default number of retries for getting the partition info. One retry means going through the full list of brokers */
-	public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
-
-	
-	
-	// ------  Configuration of the Consumer -------
-	
-	/** The offset store where this consumer commits safe offsets */
-	private final OffsetStore offsetStore;
-
-	/** The type of fetcher to be used to pull data from Kafka */
-	private final FetcherType fetcherType;
-	
-	/** name of the topic consumed by this source */
-	private final String topic;
-	
-	/** The properties to parametrize the Kafka consumer and ZooKeeper client */ 
-	private final Properties props;
-	
-	/** The ids of the partitions that are read by this consumer */
-	private final int[] partitions;
-	
-	/** The schema to convert between Kafka#s byte messages, and Flink's objects */
-	private final DeserializationSchema<T> valueDeserializer;
-
-	// ------  Runtime State  -------
-
-	/** Data for pending but uncommitted checkpoints */
-	private final LinkedMap pendingCheckpoints = new LinkedMap();
-	
-	/** The fetcher used to pull data from the Kafka brokers */
-	private transient Fetcher fetcher;
-	
-	/** The committer that persists the committed offsets */
-	private transient OffsetHandler offsetHandler;
-	
-	/** The partitions actually handled by this consumer */
-	private transient List<TopicPartition> subscribedPartitions;
-
-	/** The offsets of the last returned elements */
-	private transient long[] lastOffsets;
-
-	/** The latest offsets that have been committed to Kafka or ZooKeeper. These are never
-	 * newer then the last offsets (Flink's internal view is fresher) */
-	private transient long[] commitedOffsets;
-	
-	/** The offsets to restore to, if the consumer restores state from a checkpoint */
-	private transient long[] restoreToOffset;
-	
-	private volatile boolean running = true;
-	
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
-	 * 
-	 * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
-	 * at the beginnign of this class.</p>
-	 * 
-	 * @param topic 
-	 *           The Kafka topic to read from.
-	 * @param valueDeserializer
-	 *           The deserializer to turn raw byte messages into Java/Scala objects.
-	 * @param props
-	 *           The properties that are used to configure both the fetcher and the offset handler.
-	 * @param offsetStore
-	 *           The type of offset store to use (Kafka / ZooKeeper)
-	 * @param fetcherType
-	 *           The type of fetcher to use (new high-level API, old low-level API).
-	 */
-	public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props, 
-								OffsetStore offsetStore, FetcherType fetcherType) {
-		this.offsetStore = checkNotNull(offsetStore);
-		this.fetcherType = checkNotNull(fetcherType);
-
-		if(fetcherType == FetcherType.NEW_HIGH_LEVEL) {
-			throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 is not yet " +
-					"supported in Flink");
-		}
-		if (offsetStore == OffsetStore.KAFKA && fetcherType == FetcherType.LEGACY_LOW_LEVEL) {
-			throw new IllegalArgumentException(
-					"The Kafka offset handler cannot be used together with the old low-level fetcher.");
-		}
-		
-		this.topic = checkNotNull(topic, "topic");
-		this.props = checkNotNull(props, "props");
-		this.valueDeserializer = checkNotNull(valueDeserializer, "valueDeserializer");
-
-		// validate the zookeeper properties
-		if (offsetStore == OffsetStore.FLINK_ZOOKEEPER) {
-			validateZooKeeperConfig(props);
-		}
-		
-		// Connect to a broker to get the partitions
-		List<PartitionInfo> partitionInfos = getPartitionsForTopic(topic, props);
-
-		// get initial partitions list. The order of the partitions is important for consistent 
-		// partition id assignment in restart cases.
-		this.partitions = new int[partitionInfos.size()];
-		for (int i = 0; i < partitionInfos.size(); i++) {
-			partitions[i] = partitionInfos.get(i).partition();
-			
-			if (partitions[i] >= partitions.length) {
-				throw new RuntimeException("Kafka partition numbers are sparse");
-			}
-		}
-		LOG.info("Topic {} has {} partitions", topic, partitions.length);
-
-		// make sure that we take care of the committing
-		props.setProperty("enable.auto.commit", "false");
-	}
-
-	// ------------------------------------------------------------------------
-	//  Source life cycle
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		
-		final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
-		final int thisComsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
-		
-		// pick which partitions we work on
-		subscribedPartitions = assignPartitions(this.partitions, this.topic, numConsumers, thisComsumerIndex);
-		
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Kafka consumer {} will read partitions {} out of partitions {}",
-					thisComsumerIndex, subscribedPartitions, Arrays.toString(partitions));
-		}
-
-		// we leave the fetcher as null, if we have no partitions
-		if (subscribedPartitions.isEmpty()) {
-			LOG.info("Kafka consumer {} has no partitions (empty source)", thisComsumerIndex);
-			return;
-		}
-		
-		// create fetcher
-		switch (fetcherType){
-			case NEW_HIGH_LEVEL:
-				throw new UnsupportedOperationException("Currently unsupported");
-			case LEGACY_LOW_LEVEL:
-				fetcher = new LegacyFetcher(topic, props, getRuntimeContext().getTaskName());
-				break;
-			default:
-				throw new RuntimeException("Requested unknown fetcher " + fetcher);
-		}
-		fetcher.setPartitionsToRead(subscribedPartitions);
-
-		// offset handling
-		switch (offsetStore){
-			case FLINK_ZOOKEEPER:
-				offsetHandler = new ZookeeperOffsetHandler(props);
-				break;
-			case KAFKA:
-				throw new Exception("Kafka offset handler cannot work with legacy fetcher");
-			default:
-				throw new RuntimeException("Requested unknown offset store " + offsetStore);
-		}
-		
-		// set up operator state
-		lastOffsets = new long[partitions.length];
-		commitedOffsets = new long[partitions.length];
-		
-		Arrays.fill(lastOffsets, OFFSET_NOT_SET);
-		Arrays.fill(commitedOffsets, OFFSET_NOT_SET);
-		
-		// seek to last known pos, from restore request
-		if (restoreToOffset != null) {
-			if (LOG.isInfoEnabled()) {
-				LOG.info("Consumer {} found offsets from previous checkpoint: {}",
-						thisComsumerIndex,  Arrays.toString(restoreToOffset));
-			}
-			
-			for (int i = 0; i < restoreToOffset.length; i++) {
-				long restoredOffset = restoreToOffset[i];
-				if (restoredOffset != OFFSET_NOT_SET) {
-					// if this fails because we are not subscribed to the topic, then the
-					// partition assignment is not deterministic!
-					
-					// we set the offset +1 here, because seek() is accepting the next offset to read,
-					// but the restore offset is the last read offset
-					fetcher.seek(new TopicPartition(topic, i), restoredOffset + 1);
-					lastOffsets[i] = restoredOffset;
-				}
-			}
-		}
-		else {
-			// no restore request. Let the offset handler take care of the initial offset seeking
-			offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
-		}
-	}
-
-	@Override
-	public void run(SourceContext<T> sourceContext) throws Exception {
-		if (fetcher != null) {
-			fetcher.run(sourceContext, valueDeserializer, lastOffsets);
-		}
-		else {
-			// this source never completes
-			final Object waitLock = new Object();
-			while (running) {
-				// wait until we are canceled
-				try {
-					//noinspection SynchronizationOnLocalVariableOrMethodParameter
-					synchronized (waitLock) {
-						waitLock.wait();
-					}
-				}
-				catch (InterruptedException e) {
-					// do nothing, check our "running" status
-				}
-			}
-		}
-		
-		// close the context after the work was done. this can actually only
-		// happen when the fetcher decides to stop fetching
-		sourceContext.close();
-	}
-
-	@Override
-	public void cancel() {
-		// set ourselves as not running
-		running = false;
-		
-		// close the fetcher to interrupt any work
-		Fetcher fetcher = this.fetcher;
-		this.fetcher = null;
-		if (fetcher != null) {
-			try {
-				fetcher.close();
-			}
-			catch (IOException e) {
-				LOG.warn("Error while closing Kafka connector data fetcher", e);
-			}
-		}
-		
-		OffsetHandler offsetHandler = this.offsetHandler;
-		this.offsetHandler = null;
-		if (offsetHandler != null) {
-			try {
-				offsetHandler.close();
-			}
-			catch (IOException e) {
-				LOG.warn("Error while closing Kafka connector offset handler", e);
-			}
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		cancel();
-		super.close();
-	}
-
-	@Override
-	public TypeInformation<T> getProducedType() {
-		return valueDeserializer.getProducedType();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Checkpoint and restore
-	// ------------------------------------------------------------------------
-
-	@Override
-	public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		if (lastOffsets == null) {
-			LOG.debug("snapshotState() requested on not yet opened source; returning null.");
-			return null;
-		}
-		if (!running) {
-			LOG.debug("snapshotState() called on closed source");
-			return null;
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}",
-					Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
-		}
-
-		long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
-
-		// the map cannot be asynchronously updated, because only one checkpoint call can happen
-		// on this function at a time: either snapshotState() or notifyCheckpointComplete()
-		pendingCheckpoints.put(checkpointId, currentOffsets);
-			
-		while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
-			pendingCheckpoints.remove(0);
-		}
-
-		return currentOffsets;
-	}
-
-	@Override
-	public void restoreState(long[] restoredOffsets) {
-		restoreToOffset = restoredOffsets;
-	}
-
-	@Override
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {
-		if (fetcher == null) {
-			LOG.debug("notifyCheckpointComplete() called on uninitialized source");
-			return;
-		}
-		if (!running) {
-			LOG.debug("notifyCheckpointComplete() called on closed source");
-			return;
-		}
-		
-		// only one commit operation must be in progress
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Committing offsets externally for checkpoint {}", checkpointId);
-		}
-
-		try {
-			long[] checkpointOffsets;
-	
-			// the map may be asynchronously updates when snapshotting state, so we synchronize
-			synchronized (pendingCheckpoints) {
-				final int posInMap = pendingCheckpoints.indexOf(checkpointId);
-				if (posInMap == -1) {
-					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
-					return;
-				}
-	
-				checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
-				
-				// remove older checkpoints in map
-				for (int i = 0; i < posInMap; i++) {
-					pendingCheckpoints.remove(0);
-				}
-			}
-	
-			if (LOG.isInfoEnabled()) {
-				LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore);
-			}
-	
-			// build the map of (topic,partition) -> committed offset
-			Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
-			for (TopicPartition tp : subscribedPartitions) {
-				
-				int partition = tp.partition();
-				long offset = checkpointOffsets[partition];
-				long lastCommitted = commitedOffsets[partition];
-				
-				if (offset != OFFSET_NOT_SET) {
-					if (offset > lastCommitted) {
-						offsetsToCommit.put(tp, offset);
-						LOG.debug("Committing offset {} for partition {}", offset, partition);
-					}
-					else {
-						LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
-					}
-				}
-			}
-			
-			offsetHandler.commit(offsetsToCommit);
-		}
-		catch (Exception e) {
-			if (running) {
-				throw e;
-			}
-			// else ignore exception if we are no longer running
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Miscellaneous utilities 
-	// ------------------------------------------------------------------------
-
-	protected static List<TopicPartition> assignPartitions(int[] partitions, String topicName,
-															int numConsumers, int consumerIndex) {
-		checkArgument(numConsumers > 0);
-		checkArgument(consumerIndex < numConsumers);
-		
-		List<TopicPartition> partitionsToSub = new ArrayList<>();
-
-		for (int i = 0; i < partitions.length; i++) {
-			if (i % numConsumers == consumerIndex) {
-				partitionsToSub.add(new TopicPartition(topicName, partitions[i]));
-			}
-		}
-		return partitionsToSub;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Kafka / ZooKeeper communication utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Send request to Kafka to get partitions for topic.
-	 * 
-	 * @param topic The name of the topic.
-	 * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. 
-	 */
-	public static List<PartitionInfo> getPartitionsForTopic(final String topic, final Properties properties) {
-		String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
-		final int numRetries = Integer.valueOf(properties.getProperty(GET_PARTITIONS_RETRIES_KEY, Integer.toString(DEFAULT_GET_PARTITIONS_RETRIES)));
-
-		checkNotNull(seedBrokersConfString, "Configuration property " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set");
-		String[] seedBrokers = seedBrokersConfString.split(",");
-		List<PartitionInfo> partitions = new ArrayList<>();
-
-		Random rnd = new Random();
-		retryLoop: for(int retry = 0; retry < numRetries; retry++) {
-			// we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
-			// parallel source instances start. Still, we try all available brokers.
-			int index = rnd.nextInt(seedBrokers.length);
-			brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) {
-				String seedBroker = seedBrokers[index];
-				LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries);
-				if (++index == seedBrokers.length) {
-					index = 0;
-				}
-
-				URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker);
-				SimpleConsumer consumer = null;
-				try {
-					final String clientId = "flink-kafka-consumer-partition-lookup";
-					final int soTimeout = Integer.valueOf(properties.getProperty("socket.timeout.ms", "30000"));
-					final int bufferSize = Integer.valueOf(properties.getProperty("socket.receive.buffer.bytes", "65536"));
-					consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId);
-
-					List<String> topics = Collections.singletonList(topic);
-					TopicMetadataRequest req = new TopicMetadataRequest(topics);
-					kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
-
-					List<TopicMetadata> metaData = resp.topicsMetadata();
-
-					// clear in case we have an incomplete list from previous tries
-					partitions.clear();
-					for (TopicMetadata item : metaData) {
-						if (item.errorCode() != ErrorMapping.NoError()) {
-							if (item.errorCode() == ErrorMapping.InvalidTopicCode() || item.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) {
-								// fail hard if topic is unknown
-								throw new RuntimeException("Requested partitions for unknown topic", ErrorMapping.exceptionFor(item.errorCode()));
-							}
-							// warn and try more brokers
-							LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions for " + topic,
-									ErrorMapping.exceptionFor(item.errorCode()));
-							continue brokersLoop;
-						}
-						if (!item.topic().equals(topic)) {
-							LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
-							continue brokersLoop;
-						}
-						for (PartitionMetadata part : item.partitionsMetadata()) {
-							Node leader = brokerToNode(part.leader());
-							Node[] replicas = new Node[part.replicas().size()];
-							for (int i = 0; i < part.replicas().size(); i++) {
-								replicas[i] = brokerToNode(part.replicas().get(i));
-							}
-
-							Node[] ISRs = new Node[part.isr().size()];
-							for (int i = 0; i < part.isr().size(); i++) {
-								ISRs[i] = brokerToNode(part.isr().get(i));
-							}
-							PartitionInfo pInfo = new PartitionInfo(topic, part.partitionId(), leader, replicas, ISRs);
-							partitions.add(pInfo);
-						}
-					}
-					break retryLoop; // leave the loop through the brokers
-				} catch (Exception e) {
-					LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topic, e);
-				} finally {
-					if (consumer != null) {
-						consumer.close();
-					}
-				}
-			} // brokers loop
-		} // retries loop
-		return partitions;
-	}
-
-	private static Node brokerToNode(Broker broker) {
-		return new Node(broker.id(), broker.host(), broker.port());
-	}
-	
-	protected static void validateZooKeeperConfig(Properties props) {
-		if (props.getProperty("zookeeper.connect") == null) {
-			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
-		}
-		if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
-			throw new IllegalArgumentException("Required property '" + ConsumerConfig.GROUP_ID_CONFIG
-					+ "' has not been set in the properties");
-		}
-		
-		try {
-			//noinspection ResultOfMethodCallIgnored
-			Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
-		}
-		catch (NumberFormatException e) {
-			throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
-		}
-		
-		try {
-			//noinspection ResultOfMethodCallIgnored
-			Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
-		}
-		catch (NumberFormatException e) {
-			throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
deleted file mode 100644
index 21f24e6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Creates a Kafka consumer compatible with reading from Kafka 0.8.1.x brokers.
- * The consumer will internally use the old low-level Kafka API, and manually commit offsets
- * partition offsets to ZooKeeper.
- * 
- * <p>The following additional configuration values are available:</p>
- * <ul>
- *   <li>socket.timeout.ms</li>
- *   <li>socket.receive.buffer.bytes</li>
- *   <li>fetch.message.max.bytes</li>
- *   <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li>
- *   <li>fetch.wait.max.ms</li>
- * </ul>
- * 
- * @param <T> The type of elements produced by this consumer.
- */
-public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer<T> {
-
-	private static final long serialVersionUID = -5649906773771949146L;
-
-	/**
-	 * Creates a new Kafka 0.8.1.x streaming source consumer.
-	 *
-	 * @param topic
-	 *           The name of the topic that should be consumed.
-	 * @param valueDeserializer
-	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects. 
-	 * @param props
-	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
-	 */
-	public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
-		super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
deleted file mode 100644
index 77e41e5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Creates a Kafka consumer compatible with reading from Kafka 0.8.2.x brokers.
- * The consumer will internally use the old low-level Kafka API, and manually commit offsets
- * partition offsets to ZooKeeper.
- *
- * Once Kafka released the new consumer with Kafka 0.8.3 Flink might use the 0.8.3 consumer API
- * also against Kafka 0.8.2 installations.
- *
- * @param <T> The type of elements produced by this consumer.
- */
-public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
-
-	private static final long serialVersionUID = -8450689820627198228L;
-
-	/**
-	 * Creates a new Kafka 0.8.2.x streaming source consumer.
-	 * 
-	 * @param topic
-	 *           The name of the topic that should be consumed.
-	 * @param valueDeserializer
-	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects. 
-	 * @param props
-	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
-	 */
-	public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
-		super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
deleted file mode 100644
index 715f5ee..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Properties;
-
-
-/**
- * Flink Sink to produce data into a Kafka topic.
- *
- * Please note that this producer does not have any reliability guarantees.
- *
- * @param <IN> Type of the messages to write into Kafka.
- */
-public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
-
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Array with the partition ids of the given topicId
-	 * The size of this array is the number of partitions
-	 */
-	private final int[] partitions;
-
-	/**
-	 * User defined properties for the Producer
-	 */
-	private final Properties producerConfig;
-
-	/**
-	 * The name of the topic this producer is writing data to
-	 */
-	private final String topicId;
-
-	/**
-	 * (Serializable) SerializationSchema for turning objects used with Flink into
-	 * byte[] for Kafka.
-	 */
-	private final SerializationSchema<IN, byte[]> schema;
-
-	/**
-	 * User-provided partitioner for assigning an object to a Kafka partition.
-	 */
-	private final KafkaPartitioner partitioner;
-
-	/**
-	 * Flag indicating whether to accept failures (and log them), or to fail on failures
-	 */
-	private boolean logFailuresOnly;
-	
-	// -------------------------------- Runtime fields ------------------------------------------
-
-	/** KafkaProducer instance */
-	private transient KafkaProducer<byte[], byte[]> producer;
-
-	/** The callback than handles error propagation or logging callbacks */
-	private transient Callback callback;
-	
-	/** Errors encountered in the async producer are stored here */
-	private transient volatile Exception asyncException;
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 *			Comma separated addresses of the brokers
-	 * @param topicId
-	 * 			ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 			User defined serialization schema.
-	 */
-	public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
-		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
-	 * the topic.
-	 *
-	 * @param topicId
-	 * 			ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 			User defined serialization schema.
-	 * @param producerConfig
-	 * 			Properties with the producer configuration.
-	 */
-	public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig) {
-		this(topicId, serializationSchema, producerConfig, null);
-	}
-
-	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
-	 *
-	 * @param topicId The topic to write data to
-	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[]
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
-	 */
-	public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
-		Preconditions.checkNotNull(topicId, "TopicID not set");
-		Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
-		Preconditions.checkNotNull(producerConfig, "producerConfig not set");
-		ClosureCleaner.ensureSerializable(customPartitioner);
-		ClosureCleaner.ensureSerializable(serializationSchema);
-
-		this.topicId = topicId;
-		this.schema = serializationSchema;
-		this.producerConfig = producerConfig;
-
-		// set the producer configuration properties.
-
-		if(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
-			this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
-		} else {
-			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-		}
-
-		if(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
-			this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
-		} else {
-			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-		}
-
-
-		// create a local KafkaProducer to get the list of partitions.
-		// this will also ensure locally that all required ProducerConfig values are set.
-		try (KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig)) {
-			List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(topicId);
-
-			this.partitions = new int[partitionsList.size()];
-			for (int i = 0; i < partitions.length; i++) {
-				partitions[i] = partitionsList.get(i).partition();
-			}
-			getPartitionsProd.close();
-		}
-
-		if (customPartitioner == null) {
-			this.partitioner = new FixedPartitioner();
-		} else {
-			this.partitioner = customPartitioner;
-		}
-	}
-
-	// ---------------------------------- Properties --------------------------
-
-	/**
-	 * Defines whether the producer should fail on errors, or only log them.
-	 * If this is set to true, then exceptions will be only logged, if set to false,
-	 * exceptions will be eventually thrown and cause the streaming program to 
-	 * fail (and enter recovery).
-	 * 
-	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
-	 */
-	public void setLogFailuresOnly(boolean logFailuresOnly) {
-		this.logFailuresOnly = logFailuresOnly;
-	}
-
-	// ----------------------------------- Utilities --------------------------
-	
-	/**
-	 * Initializes the connection to Kafka.
-	 */
-	@Override
-	public void open(Configuration configuration) {
-		producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
-
-		RuntimeContext ctx = getRuntimeContext();
-		partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
-
-		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", 
-				ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), topicId);
-		
-		if (logFailuresOnly) {
-			callback = new Callback() {
-				
-				@Override
-				public void onCompletion(RecordMetadata metadata, Exception e) {
-					if (e != null) {
-						LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
-					}
-				}
-			};
-		}
-		else {
-			callback = new Callback() {
-				@Override
-				public void onCompletion(RecordMetadata metadata, Exception exception) {
-					if (exception != null && asyncException == null) {
-						asyncException = exception;
-					}
-				}
-			};
-		}
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to Kafka.
-	 *
-	 * @param next
-	 * 		The incoming data
-	 */
-	@Override
-	public void invoke(IN next) throws Exception {
-		// propagate asynchronous errors
-		checkErroneous();
-		
-		byte[] serialized = schema.serialize(next);
-		ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicId,
-				partitioner.partition(next, partitions.length),
-				null, serialized);
-		
-		producer.send(record, callback);
-	}
-
-
-	@Override
-	public void close() throws Exception {
-		if (producer != null) {
-			producer.close();
-		}
-		
-		// make sure we propagate pending errors
-		checkErroneous();
-	}
-
-
-	// ----------------------------------- Utilities --------------------------
-
-	private void checkErroneous() throws Exception {
-		Exception e = asyncException;
-		if (e != null) {
-			// prevent double throwing
-			asyncException = null;
-			throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
-		}
-	}
-	
-	public static Properties getPropertiesFromBrokerList(String brokerList) {
-		String[] elements = brokerList.split(",");
-		for(String broker: elements) {
-			NetUtils.getCorrectHostnamePort(broker);
-		}
-		Properties props = new Properties();
-		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
-		return props;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
deleted file mode 100644
index f856926..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.api;
-
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-/**
- * Sink that emits its inputs to a Kafka topic.
- *
- * The KafkaSink has been relocated to org.apache.flink.streaming.connectors.kafka.KafkaSink.
- * This class will be removed in future releases of Flink.
- */
-@Deprecated
-public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {
-	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
-		super(brokerList, topicId, serializationSchema);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
deleted file mode 100644
index 869c44f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.persistent;
-
-import kafka.consumer.ConsumerConfig;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-
-/**
- * Creates a Kafka consumer compatible with reading from Kafka 0.8.1+ consumers.
- *
- * This class is provided as a migration path from the old Flink kafka connectors to the new, updated implemntations.
- *
- * Please use FlinkKafkaConsumer081 and FlinkKafkaConsumer082.
- *
- * @param <T> The type of elements produced by this consumer.
- */
-@Deprecated
-public class PersistentKafkaSource<T> extends FlinkKafkaConsumer<T> {
-
-	private static final long serialVersionUID = -8450689820627198228L;
-
-	/**
-	 * Creates a new Kafka 0.8.2.x streaming source consumer.
-	 *
-	 * @param topic
-	 *           The name of the topic that should be consumed.
-	 * @param valueDeserializer
-	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
-	 * @param consumerConfig
-	 *           The consumer config used to configure the Kafka consumer client, and the ZooKeeper client.
-	 */
-	public PersistentKafkaSource(String topic, DeserializationSchema<T> valueDeserializer, ConsumerConfig consumerConfig) {
-		super(topic, valueDeserializer, consumerConfig.props().props(), OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
deleted file mode 100644
index 4345926..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A fetcher pulls data from Kafka, from a fix set of partitions.
- * The fetcher supports "seeking" inside the partitions, i.e., moving to a different offset.
- */
-public interface Fetcher {
-
-	/**
-	 * Set which partitions the fetcher should pull from.
-	 * 
-	 * @param partitions The list of partitions for a topic that the fetcher will pull from.
-	 */
-	void setPartitionsToRead(List<TopicPartition> partitions);
-
-	/**
-	 * Closes the fetcher. This will stop any operation in the
-	 * {@link #run(SourceFunction.SourceContext, DeserializationSchema, long[])} method and eventually
-	 * close underlying connections and release all resources.
-	 */
-	void close() throws IOException;
-
-	/**
-	 * Starts fetch data from Kafka and emitting it into the stream.
-	 * 
-	 * <p>To provide exactly once guarantees, the fetcher needs emit a record and update the update
-	 * of the last consumed offset in one atomic operation:</p>
-	 * <pre>{@code
-	 * 
-	 * while (running) {
-	 *     T next = ...
-	 *     long offset = ...
-	 *     int partition = ...
-	 *     synchronized (sourceContext.getCheckpointLock()) {
-	 *         sourceContext.collect(next);
-	 *         lastOffsets[partition] = offset;
-	 *     }
-	 * }
-	 * }</pre>
-	 * 
-	 * @param sourceContext The source context to emit elements to.
-	 * @param valueDeserializer The deserializer to decode the raw values with.
-	 * @param lastOffsets The array into which to store the offsets foe which elements are emitted. 
-	 * 
-	 * @param <T> The type of elements produced by the fetcher and emitted to the source context.
-	 */
-	<T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, 
-					long[] lastOffsets) throws Exception;
-	
-	/**
-	 * Set the next offset to read from for the given partition.
-	 * For example, if the partition <i>i</i> offset is set to <i>n</i>, the Fetcher's next result
-	 * will be the message with <i>offset=n</i>.
-	 * 
-	 * @param topicPartition The partition for which to seek the offset.
-	 * @param offsetToRead To offset to seek to.
-	 */
-	void seek(TopicPartition topicPartition, long offsetToRead);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
deleted file mode 100644
index c4ba103..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ /dev/null
@@ -1,622 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.api.OffsetRequest;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.ErrorMapping;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.StringUtils;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * This fetcher uses Kafka's low-level API to pull data from a specific
- * set of partitions and offsets for a certain topic.
- * 
- * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p>
- */
-public class LegacyFetcher implements Fetcher {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
-
-	/** The topic from which this fetcher pulls data */
-	private final String topic;
-	
-	/** The properties that configure the Kafka connection */
-	private final Properties config;
-	
-	/** The task name, to give more readable names to the spawned threads */
-	private final String taskName;
-	
-	/** The first error that occurred in a connection thread */
-	private final AtomicReference<Throwable> error;
-
-	/** The partitions that the fetcher should read, with their starting offsets */
-	private Map<TopicPartition, Long> partitionsToRead;
-	
-	/** Reference the the thread that executed the run() method. */
-	private volatile Thread mainThread;
-	
-	/** Flag to shot the fetcher down */
-	private volatile boolean running = true;
-
-	public LegacyFetcher(String topic, Properties props, String taskName) {
-		this.config = checkNotNull(props, "The config properties cannot be null");
-		this.topic = checkNotNull(topic, "The topic cannot be null");
-		this.taskName = taskName;
-		this.error = new AtomicReference<>();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Fetcher methods
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void setPartitionsToRead(List<TopicPartition> partitions) {
-		partitionsToRead = new HashMap<>(partitions.size());
-		for (TopicPartition tp: partitions) {
-			partitionsToRead.put(tp, FlinkKafkaConsumer.OFFSET_NOT_SET);
-		}
-	}
-
-	@Override
-	public void seek(TopicPartition topicPartition, long offsetToRead) {
-		if (partitionsToRead == null) {
-			throw new IllegalArgumentException("No partitions to read set");
-		}
-		if (!partitionsToRead.containsKey(topicPartition)) {
-			throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition
-					+ ") we are not going to read. Partitions to read " + partitionsToRead);
-		}
-		partitionsToRead.put(topicPartition, offsetToRead);
-	}
-	
-	@Override
-	public void close() {
-		// flag needs to be check by the run() method that creates the spawned threads
-		this.running = false;
-		
-		// all other cleanup is made by the run method itself
-	}
-
-	@Override
-	public <T> void run(SourceFunction.SourceContext<T> sourceContext, 
-						DeserializationSchema<T> valueDeserializer,
-						long[] lastOffsets) throws Exception {
-		
-		if (partitionsToRead == null || partitionsToRead.size() == 0) {
-			throw new IllegalArgumentException("No partitions set");
-		}
-		
-		// NOTE: This method is needs to always release all resources it acquires
-		
-		this.mainThread = Thread.currentThread();
-
-		LOG.info("Reading from partitions " + partitionsToRead + " using the legacy fetcher");
-		
-		// get lead broker for each partition
-		
-		// NOTE: The kafka client apparently locks itself in an infinite loop sometimes
-		// when it is interrupted, so we run it only in a separate thread.
-		// since it sometimes refuses to shut down, we resort to the admittedly harsh
-		// means of killing the thread after a timeout.
-		PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(topic, config);
-		infoFetcher.start();
-		
-		KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
-		watchDog.start();
-		
-		final List<PartitionInfo> allPartitionsInTopic = infoFetcher.getPartitions();
-		
-		// brokers to fetch partitions from.
-		int fetchPartitionsCount = 0;
-		Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<>();
-		
-		for (PartitionInfo partitionInfo : allPartitionsInTopic) {
-			if (partitionInfo.leader() == null) {
-				throw new RuntimeException("Unable to consume partition " + partitionInfo.partition()
-						+ " from topic "+partitionInfo.topic()+" because it does not have a leader");
-			}
-			
-			for (Map.Entry<TopicPartition, Long> entry : partitionsToRead.entrySet()) {
-				final TopicPartition topicPartition = entry.getKey();
-				final long offset = entry.getValue();
-				
-				// check if that partition is for us
-				if (topicPartition.partition() == partitionInfo.partition()) {
-					List<FetchPartition> partitions = fetchBrokers.get(partitionInfo.leader());
-					if (partitions == null) {
-						partitions = new ArrayList<>();
-						fetchBrokers.put(partitionInfo.leader(), partitions);
-					}
-					
-					partitions.add(new FetchPartition(topicPartition.partition(), offset));
-					fetchPartitionsCount++;
-					
-				}
-				// else this partition is not for us
-			}
-		}
-		
-		if (partitionsToRead.size() != fetchPartitionsCount) {
-			throw new RuntimeException(partitionsToRead.size() + " partitions to read, but got only "
-					+ fetchPartitionsCount + " partition infos with lead brokers.");
-		}
-
-		// create SimpleConsumers for each broker
-		ArrayList<SimpleConsumerThread<?>> consumers = new ArrayList<>(fetchBrokers.size());
-		
-		for (Map.Entry<Node, List<FetchPartition>> brokerInfo : fetchBrokers.entrySet()) {
-			final Node broker = brokerInfo.getKey();
-			final List<FetchPartition> partitionsList = brokerInfo.getValue();
-			
-			FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
-
-			SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config, topic,
-					broker, partitions, sourceContext, valueDeserializer, lastOffsets);
-
-			thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
-					taskName, broker.id(), broker.host(), broker.port()));
-			thread.setDaemon(true);
-			consumers.add(thread);
-		}
-		
-		// last check whether we should abort.
-		if (!running) {
-			return;
-		}
-		
-		// start all consumer threads
-		for (SimpleConsumerThread<?> t : consumers) {
-			LOG.info("Starting thread {}", t.getName());
-			t.start();
-		}
-		
-		// wait until all consumer threads are done, or until we are aborted, or until
-		// an error occurred in one of the fetcher threads
-		try {
-			boolean someConsumersRunning = true;
-			while (running && error.get() == null && someConsumersRunning) {
-				try {
-					// wait for the consumer threads. if an error occurs, we are interrupted
-					for (SimpleConsumerThread<?> t : consumers) {
-						t.join();
-					}
-	
-					// safety net
-					someConsumersRunning = false;
-					for (SimpleConsumerThread<?> t : consumers) {
-						someConsumersRunning |= t.isAlive();
-					}
-				}
-				catch (InterruptedException e) {
-					// ignore. we should notice what happened in the next loop check
-				}
-			}
-			
-			// make sure any asynchronous error is noticed
-			Throwable error = this.error.get();
-			if (error != null) {
-				throw new Exception(error.getMessage(), error);
-			}
-		}
-		finally {
-			// make sure that in any case (completion, abort, error), all spawned threads are stopped
-			for (SimpleConsumerThread<?> t : consumers) {
-				if (t.isAlive()) {
-					t.cancel();
-				}
-			}
-		}
-	}
-	
-	/**
-	 * Reports an error from a fetch thread. This will cause the main thread to see this error,
-	 * abort, and cancel all other fetch threads.
-	 * 
-	 * @param error The error to report.
-	 */
-	void onErrorInFetchThread(Throwable error) {
-		if (this.error.compareAndSet(null, error)) {
-			// we are the first to report an error
-			if (mainThread != null) {
-				mainThread.interrupt();
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Representation of a partition to fetch.
-	 */
-	private static class FetchPartition {
-		
-		/** ID of the partition within the topic (0 indexed, as given by Kafka) */
-		int partition;
-		
-		/** Offset pointing at the next element to read from that partition. */
-		long nextOffsetToRead;
-
-		FetchPartition(int partition, long nextOffsetToRead) {
-			this.partition = partition;
-			this.nextOffsetToRead = nextOffsetToRead;
-		}
-		
-		@Override
-		public String toString() {
-			return "FetchPartition {partition=" + partition + ", offset=" + nextOffsetToRead + '}';
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Per broker fetcher
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Each broker needs its separate connection. This thread implements the connection to
-	 * one broker. The connection can fetch multiple partitions from the broker.
-	 * 
-	 * @param <T> The data type fetched.
-	 */
-	private static class SimpleConsumerThread<T> extends Thread {
-		
-		private final SourceFunction.SourceContext<T> sourceContext;
-		private final DeserializationSchema<T> valueDeserializer;
-		private final long[] offsetsState;
-		
-		private final FetchPartition[] partitions;
-		
-		private final Node broker;
-		private final String topic;
-		private final Properties config;
-
-		private final LegacyFetcher owner;
-
-		private SimpleConsumer consumer;
-		
-		private volatile boolean running = true;
-
-
-		// exceptions are thrown locally
-		public SimpleConsumerThread(LegacyFetcher owner,
-									Properties config, String topic,
-									Node broker,
-									FetchPartition[] partitions,
-									SourceFunction.SourceContext<T> sourceContext,
-									DeserializationSchema<T> valueDeserializer,
-									long[] offsetsState) {
-			this.owner = owner;
-			this.config = config;
-			this.topic = topic;
-			this.broker = broker;
-			this.partitions = partitions;
-			this.sourceContext = checkNotNull(sourceContext);
-			this.valueDeserializer = checkNotNull(valueDeserializer);
-			this.offsetsState = checkNotNull(offsetsState);
-		}
-
-		@Override
-		public void run() {
-			try {
-				// set up the config values
-				final String clientId = "flink-kafka-consumer-legacy-" + broker.id();
-
-				// these are the actual configuration values of Kafka + their original default values.
-				final int soTimeout = Integer.valueOf(config.getProperty("socket.timeout.ms", "30000"));
-				final int bufferSize = Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536"));
-				final int fetchSize = Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576"));
-				final int maxWait = Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100"));
-				final int minBytes = Integer.valueOf(config.getProperty("fetch.min.bytes", "1"));
-				
-				// create the Kafka consumer that we actually use for fetching
-				consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
-
-				// make sure that all partitions have some offsets to start with
-				// those partitions that do not have an offset from a checkpoint need to get
-				// their start offset from ZooKeeper
-				{
-					List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
-
-					for (FetchPartition fp : partitions) {
-						if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
-							// retrieve the offset from the consumer
-							partitionsToGetOffsetsFor.add(fp);
-						}
-					}
-					if (partitionsToGetOffsetsFor.size() > 0) {
-						getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
-						LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
-								topic, partitionsToGetOffsetsFor);
-					}
-				}
-				
-				// Now, the actual work starts :-)
-				int OffsetOutOfRangeCount = 0;
-				while (running) {
-					FetchRequestBuilder frb = new FetchRequestBuilder();
-					frb.clientId(clientId);
-					frb.maxWait(maxWait);
-					frb.minBytes(minBytes);
-					
-					for (FetchPartition fp : partitions) {
-						frb.addFetch(topic, fp.partition, fp.nextOffsetToRead, fetchSize);
-					}
-					kafka.api.FetchRequest fetchRequest = frb.build();
-					LOG.debug("Issuing fetch request {}", fetchRequest);
-
-					FetchResponse fetchResponse;
-					fetchResponse = consumer.fetch(fetchRequest);
-
-					if (fetchResponse.hasError()) {
-						String exception = "";
-						List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
-						for (FetchPartition fp : partitions) {
-							short code = fetchResponse.errorCode(topic, fp.partition);
-
-							if(code == ErrorMapping.OffsetOutOfRangeCode()) {
-								// we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
-								// Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
-								partitionsToGetOffsetsFor.add(fp);
-							} else if(code != ErrorMapping.NoError()) {
-								exception += "\nException for partition " + fp.partition + ": " +
-										StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
-							}
-						}
-						if (partitionsToGetOffsetsFor.size() > 0) {
-							// safeguard against an infinite loop.
-							if(OffsetOutOfRangeCount++ > 0) {
-								throw new RuntimeException("Found invalid offsets more than once in partitions "+partitionsToGetOffsetsFor.toString()+" " +
-										"Exceptions: "+exception);
-							}
-							// get valid offsets for these partitions and try again.
-							LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
-							getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
-							LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
-							continue; // jump back to create a new fetch request. The offset has not been touched.
-						} else {
-							// all partitions failed on an error
-							throw new IOException("Error while fetching from broker: " + exception);
-						}
-					}
-
-					int messagesInFetch = 0;
-					for (FetchPartition fp : partitions) {
-						final ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, fp.partition);
-						final int partition = fp.partition;
-						
-						for (MessageAndOffset msg : messageSet) {
-							if (running) {
-								messagesInFetch++;
-								if (msg.offset() < fp.nextOffsetToRead) {
-									// we have seen this message already
-									LOG.info("Skipping message with offset " + msg.offset()
-											+ " because we have seen messages until " + fp.nextOffsetToRead
-											+ " from partition " + fp.partition + " already");
-									continue;
-								}
-								
-								ByteBuffer payload = msg.message().payload();
-								byte[] valueByte = new byte[payload.remaining()];
-								payload.get(valueByte);
-								
-								final T value = valueDeserializer.deserialize(valueByte);
-								final long offset = msg.offset();
-										
-								synchronized (sourceContext.getCheckpointLock()) {
-									sourceContext.collect(value);
-									offsetsState[partition] = offset;
-								}
-								
-								// advance offset for the next request
-								fp.nextOffsetToRead = offset + 1;
-							}
-							else {
-								// no longer running
-								return;
-							}
-						}
-					}
-					LOG.debug("This fetch contained {} messages", messagesInFetch);
-				}
-			}
-			catch (Throwable t) {
-				// report to the main thread
-				owner.onErrorInFetchThread(t);
-			}
-			finally {
-				// end of run loop. close connection to consumer
-				if (consumer != null) {
-					// closing the consumer should not fail the program
-					try {
-						consumer.close();
-					}
-					catch (Throwable t) {
-						LOG.error("Error while closing the Kafka simple consumer", t);
-					}
-				}
-			}
-		}
-
-		/**
-		 * Cancels this fetch thread. The thread will release all resources and terminate.
-		 */
-		public void cancel() {
-			this.running = false;
-			
-			// interrupt whatever the consumer is doing
-			if (consumer != null) {
-				consumer.close();
-			}
-			
-			this.interrupt();
-		}
-
-		/**
-		 * Request latest offsets for a set of partitions, via a Kafka consumer.
-		 *
-		 * @param consumer The consumer connected to lead broker
-		 * @param topic The topic name
-		 * @param partitions The list of partitions we need offsets for
-		 * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
-		 */
-		private static void getLastOffset(SimpleConsumer consumer, String topic, List<FetchPartition> partitions, long whichTime) {
-
-			Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
-			for (FetchPartition fp: partitions) {
-				TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition);
-				requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
-			}
-
-			kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
-			OffsetResponse response = consumer.getOffsetsBefore(request);
-
-			if (response.hasError()) {
-				String exception = "";
-				for (FetchPartition fp: partitions) {
-					short code;
-					if ( (code=response.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
-						exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
-					}
-				}
-				throw new RuntimeException("Unable to get last offset for topic " + topic + " and partitions " + partitions
-						+ ". " + exception);
-			}
-
-			for (FetchPartition fp: partitions) {
-				// the resulting offset is the next offset we are going to read
-				// for not-yet-consumed partitions, it is 0.
-				fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
-			}
-		}
-
-		private static long getInvalidOffsetBehavior(Properties config) {
-			long timeType;
-			if (config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").equals("latest")) {
-				timeType = OffsetRequest.LatestTime();
-			} else {
-				timeType = OffsetRequest.EarliestTime();
-			}
-			return timeType;
-		}
-	}
-	
-	private static class PartitionInfoFetcher extends Thread {
-
-		private final String topic;
-		private final Properties properties;
-		
-		private volatile List<PartitionInfo> result;
-		private volatile Throwable error;
-
-		
-		PartitionInfoFetcher(String topic, Properties properties) {
-			this.topic = topic;
-			this.properties = properties;
-		}
-
-		@Override
-		public void run() {
-			try {
-				result = FlinkKafkaConsumer.getPartitionsForTopic(topic, properties);
-			}
-			catch (Throwable t) {
-				this.error = t;
-			}
-		}
-		
-		public List<PartitionInfo> getPartitions() throws Exception {
-			try {
-				this.join();
-			}
-			catch (InterruptedException e) {
-				throw new Exception("Partition fetching was cancelled before completion");
-			}
-			
-			if (error != null) {
-				throw new Exception("Failed to fetch partitions for topic " + topic, error);
-			}
-			if (result != null) {
-				return result;
-			}
-			throw new Exception("Partition fetching failed");
-		}
-	}
-
-	private static class KillerWatchDog extends Thread {
-		
-		private final Thread toKill;
-		private final long timeout;
-
-		private KillerWatchDog(Thread toKill, long timeout) {
-			super("KillerWatchDog");
-			setDaemon(true);
-			
-			this.toKill = toKill;
-			this.timeout = timeout;
-		}
-
-		@SuppressWarnings("deprecation")
-		@Override
-		public void run() {
-			final long deadline = System.currentTimeMillis() + timeout;
-			long now;
-			
-			while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
-				try {
-					toKill.join(deadline - now);
-				}
-				catch (InterruptedException e) {
-					// ignore here, our job is important!
-				}
-			}
-			
-			// this is harsh, but this watchdog is a last resort
-			if (toKill.isAlive()) {
-				toKill.stop();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
deleted file mode 100644
index 2a82561..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The offset handler is responsible for locating the initial partition offsets 
- * where the source should start reading, as well as committing offsets from completed
- * checkpoints.
- */
-public interface OffsetHandler {
-
-	/**
-	 * Commits the given offset for the partitions. May commit the offsets to the Kafka broker,
-	 * or to ZooKeeper, based on its configured behavior.
-	 *
-	 * @param offsetsToCommit The offset to commit, per partition.
-	 */
-	void commit(Map<TopicPartition, Long> offsetsToCommit) throws Exception;
-
-	/**
-	 * Positions the given fetcher to the initial read offsets where the stream consumption
-	 * will start from.
-	 * 
-	 * @param partitions The partitions for which to seeks the fetcher to the beginning.
-	 * @param fetcher The fetcher that will pull data from Kafka and must be positioned.
-	 */
-	void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) throws Exception;
-
-	/**
-	 * Closes the offset handler, releasing all resources.
-	 * 
-	 * @throws IOException Thrown, if the closing fails.
-	 */
-	void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
deleted file mode 100644
index a38c3bd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Hacky wrapper to send an object instance through a Properties - map.
- *
- * This works as follows:
- * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
- *
- * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
- * This is set in the key-value (java.util.Properties) map.
- * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
- * This is a hack because the put() method is called on the underlying Hashmap.
- *
- * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
- *
- * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
- */
-public class PartitionerWrapper implements Partitioner {
-	public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
-
-	private Partitioner wrapped;
-	public PartitionerWrapper(VerifiableProperties properties) {
-		wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
-	}
-
-	@Override
-	public int partition(Object value, int numberOfPartitions) {
-		return wrapped.partition(value, numberOfPartitions);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
deleted file mode 100644
index 001b6cb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import java.nio.charset.Charset;
-
-/**
- * Simple ZooKeeper serializer for Strings.
- */
-public class ZooKeeperStringSerializer implements ZkSerializer {
-
-	private static final Charset CHARSET = Charset.forName("UTF-8");
-	
-	@Override
-	public byte[] serialize(Object data) {
-		if (data instanceof String) {
-			return ((String) data).getBytes(CHARSET);
-		}
-		else {
-			throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
-		}
-	}
-
-	@Override
-	public Object deserialize(byte[] bytes) {
-		if (bytes == null) {
-			return null;
-		}
-		else {
-			return new String(bytes, CHARSET);
-		}
-	}
-}