You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/12 23:03:19 UTC

[08/10] flink git commit: [FLINK-1935] Reimplement PersistentKafkaSource using high level Kafka API

[FLINK-1935] Reimplement PersistentKafkaSource using high level Kafka API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54e95761
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54e95761
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54e95761

Branch: refs/heads/master
Commit: 54e957614c38fed69baf726fc86059e9b11384cb
Parents: 3b2ee23
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Apr 23 12:29:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 21:35:58 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java |   3 +-
 .../checkpoint/CheckpointCoordinator.java       |   2 +-
 .../runtime/operators/RegularPactTask.java      |   2 +-
 .../flink/runtime/state/OperatorState.java      |  87 ----
 .../flink/runtime/state/StateCheckpoint.java    |  59 ---
 .../streaming/connectors/ConnectorSource.java   |  14 +-
 .../connectors/kafka/KafkaConsumerExample.java  |   1 -
 .../kafka/KafkaSimpleConsumerExample.java       |  58 ---
 .../flink/streaming/connectors/kafka/Utils.java |   3 +-
 .../connectors/kafka/api/KafkaSink.java         |  53 ++-
 .../connectors/kafka/api/KafkaSource.java       |   1 -
 .../api/persistent/PersistentKafkaSource.java   | 346 ++++++++++++++++
 .../kafka/api/simple/KafkaTopicUtils.java       | 250 -----------
 .../kafka/api/simple/MessageWithMetadata.java   |  46 ---
 .../kafka/api/simple/PersistentKafkaSource.java | 284 -------------
 .../simple/iterator/KafkaConsumerIterator.java  |  46 ---
 .../iterator/KafkaIdleConsumerIterator.java     |  57 ---
 .../KafkaMultiplePartitionsIterator.java        | 116 ------
 .../iterator/KafkaSinglePartitionIterator.java  | 392 ------------------
 .../api/simple/offset/BeginningOffset.java      |  32 --
 .../kafka/api/simple/offset/CurrentOffset.java  |  32 --
 .../kafka/api/simple/offset/GivenOffset.java    |  39 --
 .../kafka/api/simple/offset/KafkaOffset.java    |  87 ----
 .../kafka/api/simple/offset/Offset.java         |  32 --
 .../streaming/connectors/kafka/KafkaITCase.java | 410 +++++++++++++------
 .../connectors/kafka/KafkaTopicUtilsTest.java   | 152 -------
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../checkpoint/CheckpointedAsynchronously.java  |   4 +-
 .../api/environment/LocalStreamEnvironment.java |   9 +-
 .../environment/StreamExecutionEnvironment.java |   8 +-
 .../functions/source/GenericSourceFunction.java |  25 --
 .../flink/streaming/api/graph/StreamGraph.java  |  19 +-
 .../streaming/api/operators/StreamOperator.java |   9 +-
 .../streaming/api/state/PartitionableState.java |  66 ---
 .../streaming/runtime/tasks/StreamTask.java     |   7 +-
 .../serialization/DeserializationSchema.java    |   4 +-
 .../serialization/JavaDefaultStringSchema.java  |   7 +
 .../streaming/util/serialization/RawSchema.java |   8 +
 .../util/serialization/SimpleStringSchema.java  |   7 +
 .../streaming/api/state/OperatorStateTest.java  |  44 --
 40 files changed, 711 insertions(+), 2112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 0fec67f..2dcb76f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1445,12 +1445,11 @@ public class TypeExtractor {
 
 	public static <X> TypeInformation<X> getForObject(X value) {
 		return new TypeExtractor().privateGetForObject(value);
-
 	}
+
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private <X> TypeInformation<X> privateGetForObject(X value) {
 		Validate.notNull(value);
-		
 		// check if we can extract the types from tuples, otherwise work with the class
 		if (value instanceof Tuple) {
 			Tuple t = (Tuple) value;

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b3f6587..be0fcf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -147,7 +147,7 @@ public class CheckpointCoordinator {
 				return;
 			}
 			shutdown = true;
-			LOG.info("Stopping checkpoint coordinator jor job " + job);
+			LOG.info("Stopping checkpoint coordinator for job " + job);
 			
 			// shut down the thread that handles the timeouts
 			timer.cancel();

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 2bee094..c844d8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -507,7 +507,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			// Collect the accumulators of all involved UDFs and send them to the
 			// JobManager. close() has been called earlier for all involved UDFs
 			// (using this.stub.close() and closeChainedTasks()), so UDFs can no longer
-			// modify accumulators.ll;
+			// modify accumulators;
 			if (this.stub != null) {
 				// collect the counters from the stub
 				if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
deleted file mode 100644
index 8dfd715..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.Serializable;
-
-/**
- * Abstract class for representing operator states in Flink programs. By
- * implementing the methods declared in this abstraction the state of the
- * operator can be checkpointed by the fault tolerance mechanism.
- * 
- * @param <T>
- *            The type of the operator state.
- */
-public class OperatorState<T> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private T stateObject;
-
-	/**
-	 * Initializes the state using the given state object.
-	 * 
-	 * @param initialState
-	 *            The initial state object
-	 */
-	public OperatorState(T initialState) {
-		stateObject = initialState;
-	}
-
-	/**
-	 * Returns the currently stored state object.
-	 * 
-	 * @return The state.
-	 */
-	public T getState() {
-		return stateObject;
-	}
-
-	/**
-	 * Updates the current state object. States should be only updated using
-	 * this method to avoid concurrency issues.
-	 * 
-	 * @param stateUpdate
-	 *            The update applied.
-	 */
-	@SuppressWarnings("unchecked")
-	public synchronized void update(Object stateUpdate) {
-		this.stateObject = (T) stateUpdate;
-	}
-
-	/**
-	 * Creates a {@link StateCheckpoint} that will be used to backup the state
-	 * for failure recovery. This method will be called by the state
-	 * checkpointer.
-	 * 
-	 * @return The {@link StateCheckpoint} created.
-	 */
-	public synchronized StateCheckpoint<T> checkpoint() {
-		return new StateCheckpoint<T>(this);
-	}
-
-	@Override
-	public String toString() {
-		return stateObject.toString();
-	}
-
-	public boolean stateEquals(OperatorState<T> other) {
-		return stateObject.equals(other.stateObject);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
deleted file mode 100644
index a872e5a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.Serializable;
-
-/**
- * Base class for creating checkpoints for {@link OperatorState}. This
- * checkpoints will be used to backup states in stateful Flink operators and
- * also to restore them in case of node failure. To allow incremental
- * checkpoints override the {@link #update(StateCheckpoint)} method.
- * 
- * @param <T>
- *            The type of the state.
- */
-public class StateCheckpoint<T> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	public OperatorState<T> checkpointedState;
-
-	/**
-	 * Creates a state checkpoint from the given {@link OperatorState}
-	 * 
-	 * @param operatorState
-	 *            The {@link OperatorState} to checkpoint.
-	 */
-	public StateCheckpoint(OperatorState<T> operatorState) {
-		this.checkpointedState = operatorState;
-	}
-
-	public OperatorState<T> restore() {
-		return checkpointedState;
-	}
-
-	@Override
-	public String toString() {
-		return checkpointedState.toString();
-	}
-
-	public boolean stateEquals(StateCheckpoint<T> other) {
-		return checkpointedState.equals(other.checkpointedState);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
index caabb21..fde4cdf 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -19,13 +19,10 @@ package org.apache.flink.streaming.connectors;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.source.GenericSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
-public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements
-		GenericSourceFunction<OUT> {
+public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT>{
 
 	private static final long serialVersionUID = 1L;
 	protected DeserializationSchema<OUT> schema;
@@ -35,12 +32,7 @@ public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OU
 	}
 
 	@Override
-	public TypeInformation<OUT> getType() {
-		if(schema instanceof ResultTypeQueryable) {
-			return ((ResultTypeQueryable<OUT>) schema).getProducedType();
-		}
-		return TypeExtractor.createTypeInfo(DeserializationSchema.class, schema.getClass(), 0,
-				null, null);
+	public TypeInformation<OUT> getProducedType() {
+		return schema.getProducedType();
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index 1bf2962..fe6684d 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -38,7 +38,6 @@ public class KafkaConsumerExample {
 
 		DataStream<String> kafkaStream = env
 				.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
-
 		kafkaStream.print();
 
 		env.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
deleted file mode 100644
index d43460b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-
-public class KafkaSimpleConsumerExample {
-
-	private static String host;
-	private static int port;
-	private static String topic;
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
-		DataStream<String> kafkaStream = env
-				.addSource(new PersistentKafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
-
-		kafkaStream.print();
-
-		env.execute();
-	}
-
-	private static boolean parseParameters(String[] args) {
-		if (args.length == 3) {
-			host = args[0];
-			port = Integer.parseInt(args[1]);
-			topic = args[2];
-			return true;
-		} else {
-			System.err.println("Usage: KafkaConsumerExample <host> <port> <topic>");
-			return false;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
index a29ba4d..4286196 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
 import org.apache.flink.runtime.util.DataOutputSerializer;
@@ -30,7 +29,7 @@ import java.io.IOException;
 
 public class Utils {
 	public static class TypeInformationSerializationSchema<T>
-			implements DeserializationSchema<T>, SerializationSchema<T, byte[]>, ResultTypeQueryable<T> {
+			implements DeserializationSchema<T>, SerializationSchema<T, byte[]> {
 		private final TypeSerializer<T> serializer;
 		private final TypeInformation<T> ti;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 85c4d35..0965b29 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -38,6 +37,7 @@ import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 import kafka.serializer.DefaultEncoder;
 
+
 /**
  * Sink that emits its inputs to a Kafka topic.
  *
@@ -53,7 +53,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	private Producer<IN, byte[]> producer;
 	private Properties userDefinedProperties;
 	private String topicId;
-	private String zookeeperAddress;
+	private String brokerList;
 	private SerializationSchema<IN, byte[]> schema;
 	private SerializableKafkaPartitioner partitioner;
 	private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
@@ -62,16 +62,16 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	 * Creates a KafkaSink for a given topic. The sink produces its input to
 	 * the topic.
 	 *
-	 * @param zookeeperAddress
-	 * 		Address of the Zookeeper host (with port number).
+	 * @param brokerList
+	 *			Addresses of the brokers
 	 * @param topicId
 	 * 		ID of the Kafka topic.
 	 * @param serializationSchema
 	 * 		User defined serialization schema.
 	 */
-	public KafkaSink(String zookeeperAddress, String topicId,
+	public KafkaSink(String brokerList, String topicId,
 			SerializationSchema<IN, byte[]> serializationSchema) {
-		this(zookeeperAddress, topicId, new Properties(), serializationSchema);
+		this(brokerList, topicId, new Properties(), serializationSchema);
 	}
 
 	/**
@@ -79,8 +79,8 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	 * If you use this constructor, the broker should be set with the "metadata.broker.list"
 	 * configuration.
 	 *
-	 * @param zookeeperAddress
-	 * 		Address of the Zookeeper host (with port number).
+	 * @param brokerList
+	 * 		Addresses of the brokers
 	 * @param topicId
 	 * 		ID of the Kafka topic.
 	 * @param producerConfig
@@ -88,13 +88,15 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	 * @param serializationSchema
 	 * 		User defined serialization schema.
 	 */
-	public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
+	public KafkaSink(String brokerList, String topicId, Properties producerConfig,
 			SerializationSchema<IN, byte[]> serializationSchema) {
-		NetUtils.ensureCorrectHostnamePort(zookeeperAddress);
+		String[] elements = brokerList.split(",");
+		for(String broker: elements) {
+			NetUtils.ensureCorrectHostnamePort(broker);
+		}
 		Preconditions.checkNotNull(topicId, "TopicID not set");
-		ClosureCleaner.ensureSerializable(partitioner);
 
-		this.zookeeperAddress = zookeeperAddress;
+		this.brokerList = brokerList;
 		this.topicId = topicId;
 		this.schema = serializationSchema;
 		this.partitionerClass = null;
@@ -105,8 +107,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	 * Creates a KafkaSink for a given topic. The sink produces its input to
 	 * the topic.
 	 *
-	 * @param zookeeperAddress
-	 * 		Address of the Zookeeper host (with port number).
+	 * @param brokerList
 	 * @param topicId
 	 * 		ID of the Kafka topic.
 	 * @param serializationSchema
@@ -114,15 +115,18 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	 * @param partitioner
 	 * 		User defined partitioner.
 	 */
-	public KafkaSink(String zookeeperAddress, String topicId,
+	public KafkaSink(String brokerList, String topicId,
 			SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
-		this(zookeeperAddress, topicId, serializationSchema);
+		this(brokerList, topicId, serializationSchema);
+		ClosureCleaner.ensureSerializable(partitioner);
 		this.partitioner = partitioner;
 	}
 
-	public KafkaSink(String zookeeperAddress, String topicId,
-			SerializationSchema<IN, byte[]> serializationSchema, Class<? extends SerializableKafkaPartitioner> partitioner) {
-		this(zookeeperAddress, topicId, serializationSchema);
+	public KafkaSink(String brokerList,
+			String topicId,
+			SerializationSchema<IN, byte[]> serializationSchema,
+			Class<? extends SerializableKafkaPartitioner> partitioner) {
+		this(brokerList, topicId, serializationSchema);
 		this.partitionerClass = partitioner;
 	}
 
@@ -132,16 +136,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	@Override
 	public void open(Configuration configuration) {
 
-		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperAddress);
-		String listOfBrokers = kafkaTopicUtils.getBrokerList(topicId);
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Broker list: {}", listOfBrokers);
-		}
-
 		Properties properties = new Properties();
 
-		properties.put("metadata.broker.list", listOfBrokers);
+		properties.put("metadata.broker.list", brokerList);
 		properties.put("request.required.acks", "-1");
 		properties.put("message.send.max.retries", "10");
 
@@ -168,7 +165,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 		try {
 			producer = new Producer<IN, byte[]>(config);
 		} catch (NullPointerException e) {
-			throw new RuntimeException("Cannot connect to Kafka broker " + listOfBrokers, e);
+			throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index a0805c0..28e338c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -47,7 +47,6 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 
 	private static final long serialVersionUID = 1L;
 
-	@SuppressWarnings("unused")
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
 
 	private final String zookeeperAddress;

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
new file mode 100644
index 0000000..50ee15a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.persistent;
+
+import com.google.common.base.Preconditions;
+import kafka.common.TopicAndPartition;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Collector;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Source for reading from Kafka using Flink Streaming Fault Tolerance.
+ * This source is updating the committed offset in Zookeeper based on the internal checkpointing of Flink.
+ *
+ * Note that the autocommit feature of Kafka needs to be disabled for using this source.
+ */
+public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> implements
+		ResultTypeQueryable<OUT>,
+		CheckpointCommitter,
+		CheckpointedAsynchronously<long[]> {
+	private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
+
+	protected transient ConsumerConfig consumerConfig;
+	private transient ConsumerIterator<byte[], byte[]> iteratorToRead;
+	private transient ConsumerConnector consumer;
+
+	private String topicName;
+	private DeserializationSchema<OUT> deserializationSchema;
+	private boolean running = true;
+
+	private transient long[] lastOffsets;
+	private transient ZkClient zkClient;
+	private transient long[] commitedOffsets; // maintain committed offsets, to avoid committing the same over and over again.
+
+
+	/**
+	 *
+	 * For the @param consumerConfig, specify at least the "groupid" and "zookeeper.connect" string.
+	 * The config will be passed into the Kafka High Level Consumer.
+	 * For a full list of possible values, check this out: https://kafka.apache.org/documentation.html#consumerconfigs
+	 */
+	public PersistentKafkaSource(String topicName, DeserializationSchema<OUT> deserializationSchema, ConsumerConfig consumerConfig) {
+		Preconditions.checkNotNull(topicName);
+		Preconditions.checkNotNull(deserializationSchema);
+		Preconditions.checkNotNull(consumerConfig);
+
+		this.topicName = topicName;
+		this.deserializationSchema = deserializationSchema;
+		this.consumerConfig = consumerConfig;
+		if(consumerConfig.autoCommitEnable()) {
+			throw new IllegalArgumentException("'auto.commit.enable' is set to 'true'. " +
+					"This source can only be used with auto commit disabled because the " +
+					"source is committing to zookeeper by itself (not using the KafkaConsumer).");
+		}
+		if(!consumerConfig.offsetsStorage().equals("zookeeper")) {
+			// we can currently only commit to ZK.
+			throw new IllegalArgumentException("The 'offsets.storage' has to be set to 'zookeeper' for this Source to work reliably");
+		}
+	}
+
+	// ---------------------- ParallelSourceFunction Lifecycle -----------------
+
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(this.consumerConfig);
+		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
+		// will see each message only once.
+		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topicCountMap);
+		if(streams.size() != 1) {
+			throw new RuntimeException("Expected only one message stream but got "+streams.size());
+		}
+		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+		if(kafkaStreams == null) {
+			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+		}
+		if(kafkaStreams.size() != 1) {
+			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+		}
+		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, consumerConfig.groupId());
+		this.iteratorToRead = kafkaStreams.get(0).iterator();
+		this.consumer = consumer;
+
+		zkClient = new ZkClient(consumerConfig.zkConnect(),
+			consumerConfig.zkSessionTimeoutMs(),
+			consumerConfig.zkConnectionTimeoutMs(),
+			new KafkaZKStringSerializer());
+
+		// most likely the number of offsets we're going to store here will be lower than the number of partitions.
+		int numPartitions = getNumberOfPartitions();
+		LOG.debug("The topic {} has {} partitions", topicName, numPartitions);
+		this.lastOffsets = new long[numPartitions];
+		this.commitedOffsets = new long[numPartitions];
+		Arrays.fill(this.lastOffsets, -1);
+		Arrays.fill(this.commitedOffsets, 0); // just to make it clear
+	}
+
+
+	@Override
+	public void run(Collector<OUT> collector) throws Exception {
+		if(iteratorToRead == null) {
+			throw new RuntimeException("Stream to read not initialized properly. Has open() been called");
+		}
+		try {
+			while (iteratorToRead.hasNext()) {
+				if (!running) {
+					LOG.info("Source got stopped");
+					break;
+				}
+				MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next();
+				if(lastOffsets[message.partition()] >= message.offset()) {
+					LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
+					continue;
+				}
+				lastOffsets[message.partition()] = message.offset();
+
+				OUT out = deserializationSchema.deserialize(message.message());
+				if (deserializationSchema.isEndOfStream(out)) {
+					LOG.info("DeserializationSchema signaled end of stream for this source");
+					break;
+				}
+
+				collector.collect(out);
+				if (LOG.isTraceEnabled()) {
+					LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition());
+				}
+			}
+		} catch(Exception ie) {
+			// this exception is coming from Scala code.
+			if(ie instanceof InterruptedException) {
+				if(running) {
+					throw new RuntimeException("Error while reading kafka consumer", ie);
+				} else {
+					LOG.debug("Kafka consumer got interrupted because it has been cancelled. This is expected", ie);
+				}
+			} else {
+				throw ie;
+			}
+		}
+
+		LOG.info("Source has finished reading data from the KafkaStream");
+	}
+
+	@Override
+	public void cancel() {
+		LOG.info("Instructing source to stop reading data from Kafka");
+		running = false;
+	}
+
+	@Override
+	public void close() {
+		LOG.info("Closing Kafka consumer");
+		this.consumer.shutdown();
+		zkClient.close();
+	}
+
+
+	// ---------------------- State / Checkpoint handling  -----------------
+	// this source is keeping the partition offsets in Zookeeper
+
+	private Map<Long, long[]> pendingCheckpoints = new HashMap<Long, long[]>();
+
+	@Override
+	public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		if(lastOffsets == null) {
+			LOG.warn("State snapshot requested on not yet opened source. Returning null");
+			return null;
+		}
+		LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}", Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
+
+		long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
+		pendingCheckpoints.put(checkpointId, currentOffsets);
+		return currentOffsets;
+	}
+
+	@Override
+	public void restoreState(long[] state) {
+		// we maintain the offsets in Kafka, so nothing to do.
+	}
+
+
+	/**
+	 * Notification on completed checkpoints
+	 * @param checkpointId The ID of the checkpoint that has been completed.
+	 */
+	@Override
+	public void commitCheckpoint(long checkpointId) {
+		LOG.info("Commit checkpoint {}", checkpointId);
+		long[] checkpointOffsets = pendingCheckpoints.remove(checkpointId);
+		if(checkpointOffsets == null) {
+			LOG.warn("Unable to find pending checkpoint for id {}", checkpointId);
+			return;
+		}
+		LOG.info("Got corresponding offsets {}", Arrays.toString(checkpointOffsets));
+
+		for(int partition = 0; partition < checkpointOffsets.length; partition++) {
+			long offset = checkpointOffsets[partition];
+			if(offset != -1) {
+				setOffset(partition, offset);
+			}
+		}
+	}
+
+	// --------------------- Zookeeper / Offset handling -----------------------------
+
+	private int getNumberOfPartitions() {
+		scala.collection.immutable.List<String> scalaSeq = JavaConversions.asScalaBuffer(Collections.singletonList(topicName)).toList();
+		scala.collection.mutable.Map<String, Seq<Object>> list =  ZkUtils.getPartitionsForTopics(zkClient, scalaSeq);
+		Option<Seq<Object>> topicOption = list.get(topicName);
+		if(topicOption.isEmpty()) {
+			throw new IllegalArgumentException("Unable to get number of partitions for topic "+topicName+" from "+list.toString());
+		}
+		Seq<Object> topic = topicOption.get();
+		return topic.size();
+	}
+
+	protected void setOffset(int partition, long offset) {
+		if(commitedOffsets[partition] < offset) {
+			setOffset(zkClient, consumerConfig.groupId(), topicName, partition, offset);
+			commitedOffsets[partition] = offset;
+		} else {
+			LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
+		}
+	}
+
+
+
+	// the following two methods are static to allow access from the outside as well (Testcases)
+
+	/**
+	 * This method's code is based on ZookeeperConsumerConnector.commitOffsetToZooKeeper()
+	 */
+	public static void setOffset(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
+		LOG.info("Setting offset for partition {} of topic {} in group {} to offset {}", partition, topic, groupId, offset);
+		TopicAndPartition tap = new TopicAndPartition(topic, partition);
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+		ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
+	}
+
+	public static long getOffset(ZkClient zkClient, String groupId, String topic, int partition) {
+		TopicAndPartition tap = new TopicAndPartition(topic, partition);
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+		scala.Tuple2<String, Stat> data = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition());
+		return Long.valueOf(data._1());
+	}
+
+
+	// ---------------------- (Java)Serialization methods for the consumerConfig -----------------
+
+	private void writeObject(ObjectOutputStream out)
+			throws IOException, ClassNotFoundException {
+		out.defaultWriteObject();
+		out.writeObject(consumerConfig.props().props());
+	}
+
+	private void readObject(ObjectInputStream in)
+			throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		Properties props = (Properties) in.readObject();
+		consumerConfig = new ConsumerConfig(props);
+	}
+
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return deserializationSchema.getProducedType();
+	}
+
+
+	// ---------------------- Zookeeper Serializer copied from Kafka (because it has private access there)  -----------------
+
+	public static class KafkaZKStringSerializer implements ZkSerializer {
+
+		@Override
+		public byte[] serialize(Object data) throws ZkMarshallingError {
+			try {
+				return ((String) data).getBytes("UTF-8");
+			} catch (UnsupportedEncodingException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+			if (bytes == null) {
+				return null;
+			} else {
+				try {
+					return new String(bytes, "UTF-8");
+				} catch (UnsupportedEncodingException e) {
+					throw new RuntimeException(e);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
deleted file mode 100644
index 365961d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Set;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.api.TopicMetadata;
-import kafka.cluster.Broker;
-import kafka.common.LeaderNotAvailableException;
-import kafka.common.UnknownTopicOrPartitionException;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-
-/**
- * For retrieving Kafka topic information (e.g. number of partitions),
- * or creating a topic.
- */
-public class KafkaTopicUtils {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtils.class);
-
-	private ZkClient zkClient;
-
-	public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS = 10000;
-	public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10000;
-
-	private final String zookeeperAddress;
-	private final int sessionTimeoutMs;
-	private final int connectionTimeoutMs;
-
-	private volatile boolean isRunning = false;
-
-	public KafkaTopicUtils(String zookeeperServer) {
-		this(zookeeperServer, DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
-	}
-
-	public KafkaTopicUtils(String zookeeperAddress, int sessionTimeoutMs, int connectionTimeoutMs) {
-		this.zookeeperAddress = zookeeperAddress;
-		this.sessionTimeoutMs = sessionTimeoutMs;
-		this.connectionTimeoutMs = connectionTimeoutMs;
-	}
-
-	public void createTopic(String topicName, int numOfPartitions, int replicationFactor) {
-
-		LOG.info("Creating Kafka topic '{}'", topicName);
-		Properties topicConfig = new Properties();
-		if (topicExists(topicName)) {
-			if (LOG.isWarnEnabled()) {
-				LOG.warn("Kafka topic \"{}\" already exists. Returning without action.", topicName);
-			}
-		} else {
-			LOG.info("Connecting zookeeper");
-
-			initZkClient();
-			AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig);
-			closeZkClient();
-		}
-	}
-
-	public String getBrokerList(String topicName) {
-		return getBrokerAddressList(getBrokerAddresses(topicName));
-	}
-
-	public String getBrokerList(String topicName, int partitionId) {
-		return getBrokerAddressList(getBrokerAddresses(topicName, partitionId));
-	}
-
-	public Set<String> getBrokerAddresses(String topicName) {
-		int numOfPartitions = getNumberOfPartitions(topicName);
-
-		HashSet<String> brokers = new HashSet<String>();
-		for (int i = 0; i < numOfPartitions; i++) {
-			brokers.addAll(getBrokerAddresses(topicName, i));
-		}
-		return brokers;
-	}
-
-	public Set<String> getBrokerAddresses(String topicName, int partitionId) {
-		PartitionMetadata partitionMetadata = waitAndGetPartitionMetadata(topicName, partitionId);
-		Collection<Broker> inSyncReplicas = JavaConversions.asJavaCollection(partitionMetadata.isr());
-
-		HashSet<String> addresses = new HashSet<String>();
-		for (Broker broker : inSyncReplicas) {
-			addresses.add(broker.connectionString());
-		}
-		return addresses;
-	}
-
-	private static String getBrokerAddressList(Set<String> brokerAddresses) {
-		StringBuilder brokerAddressList = new StringBuilder("");
-		for (String broker : brokerAddresses) {
-			brokerAddressList.append(broker);
-			brokerAddressList.append(',');
-		}
-		brokerAddressList.deleteCharAt(brokerAddressList.length() - 1);
-
-		return brokerAddressList.toString();
-	}
-
-	public int getNumberOfPartitions(String topicName) {
-		Seq<PartitionMetadata> partitionMetadataSeq = getTopicMetadata(topicName).partitionsMetadata();
-		return JavaConversions.asJavaCollection(partitionMetadataSeq).size();
-	}
-
-	public PartitionMetadata waitAndGetPartitionMetadata(String topicName, int partitionId) {
-		isRunning = true;
-		PartitionMetadata partitionMetadata = null;
-		while (isRunning) {
-			try {
-				partitionMetadata = getPartitionMetadata(topicName, partitionId);
-				return partitionMetadata;
-			} catch (LeaderNotAvailableException e) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Got {} trying to fetch metadata again", e.getMessage());
-				}
-			}
-		}
-		isRunning = false;
-		return partitionMetadata;
-	}
-
-	public PartitionMetadata getPartitionMetadata(String topicName, int partitionId) {
-		PartitionMetadata partitionMetadata = getPartitionMetadataWithErrorCode(topicName, partitionId);
-		switch (partitionMetadata.errorCode()) {
-			case 0:
-				return partitionMetadata;
-			case 3:
-				throw new UnknownTopicOrPartitionException("While fetching metadata for " + topicName + " / " + partitionId);
-			case 5:
-				throw new LeaderNotAvailableException("While fetching metadata for " + topicName + " / " + partitionId);
-				default:
-					throw new RuntimeException("Unknown error occurred while fetching metadata for "
-							+ topicName + " / " + partitionId + ", with error code: " + partitionMetadata.errorCode());
-		}
-	}
-
-	private PartitionMetadata getPartitionMetadataWithErrorCode(String topicName, int partitionId) {
-		TopicMetadata topicInfo = getTopicMetadata(topicName);
-
-		Collection<PartitionMetadata> partitions = JavaConversions.asJavaCollection(topicInfo.partitionsMetadata());
-
-		Iterator<PartitionMetadata> iterator = partitions.iterator();
-		for (PartitionMetadata partition : partitions) {
-			if (partition.partitionId() == partitionId) {
-				return partition;
-			}
-		}
-
-		throw new RuntimeException("No such partition: " + topicName + " / " + partitionId);
-	}
-
-	public TopicMetadata getTopicMetadata(String topicName) {
-		TopicMetadata topicMetadata = getTopicMetadataWithErrorCode(topicName);
-		switch (topicMetadata.errorCode()) {
-			case 0:
-				return topicMetadata;
-			case 3:
-				throw new UnknownTopicOrPartitionException("While fetching metadata for topic " + topicName);
-			case 5:
-				throw new LeaderNotAvailableException("While fetching metadata for topic " + topicName);
-			default:
-				throw new RuntimeException("Unknown error occurred while fetching metadata for topic "
-						+ topicName + ", with error code: " + topicMetadata.errorCode());
-		}
-	}
-
-	private TopicMetadata getTopicMetadataWithErrorCode(String topicName) {
-		if (topicExists(topicName)) {
-			initZkClient();
-			TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient);
-			closeZkClient();
-
-			return topicMetadata;
-		} else {
-			throw new RuntimeException("Topic does not exist: " + topicName);
-		}
-	}
-
-	public boolean topicExists(String topicName) {
-		initZkClient();
-		boolean topicExists = AdminUtils.topicExists(zkClient, topicName);
-		closeZkClient();
-
-		return topicExists;
-	}
-
-	private void initZkClient() {
-		zkClient = new ZkClient(zookeeperAddress, sessionTimeoutMs, connectionTimeoutMs,
-				new KafkaZKStringSerializer());
-		zkClient.waitUntilConnected();
-	}
-
-	private void closeZkClient() {
-		zkClient.close();
-		zkClient = null;
-	}
-
-	private static class KafkaZKStringSerializer implements ZkSerializer {
-
-		@Override
-		public byte[] serialize(Object data) throws ZkMarshallingError {
-			try {
-				return ((String) data).getBytes("UTF-8");
-			} catch (UnsupportedEncodingException e) {
-				throw new RuntimeException(e);
-			}
-		}
-
-		@Override
-		public Object deserialize(byte[] bytes) throws ZkMarshallingError {
-			if (bytes == null) {
-				return null;
-			} else {
-				try {
-					return new String(bytes, "UTF-8");
-				} catch (UnsupportedEncodingException e) {
-					throw new RuntimeException(e);
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithMetadata.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithMetadata.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithMetadata.java
deleted file mode 100644
index 3985350..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithMetadata.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple;
-
-/**
- * POJO encapsulating records received from Kafka with their offset and partition.
- */
-public class MessageWithMetadata {
-
-	private int partition;
-	private long offset;
-	private byte[] message;
-
-	public MessageWithMetadata(long offset, byte[] message, int partition) {
-		this.partition = partition;
-		this.offset = offset;
-		this.message = message;
-	}
-
-	public long getOffset() {
-		return offset;
-	}
-
-	public byte[] getMessage() {
-		return message;
-	}
-
-	public int getPartition() {
-		return partition;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
deleted file mode 100644
index a842160..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import com.google.common.base.Preconditions;
-import kafka.consumer.ConsumerConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator;
-import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaIdleConsumerIterator;
-import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaMultiplePartitionsIterator;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.BeginningOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Kafka source persisting its offset through the {@link OperatorState} interface.
- * This allows the offset to be restored to the latest one that has been acknowledged
- * by the whole execution graph.
- *
- * @param <OUT>
- * 		Type of the messages on the topic.
- */
-public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> implements Checkpointed<HashMap<Integer, KafkaOffset>> {
-
-	private static final long serialVersionUID = 1L;
-	private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
-
-	public static final String WAIT_ON_EMPTY_FETCH_KEY = "flink.waitOnEmptyFetchMillis";
-	public static final String WAIT_ON_FAILED_LEADER_MS_KEY = "flink.waitOnFailedLeaderDetection";
-	public static final int WAIT_ON_FAILED_LEADER__MS_DEFAULT = 2000;
-
-	public static final String MAX_FAILED_LEADER_RETRIES_KEY = "flink.maxLeaderDetectionRetries";
-	public static final int MAX_FAILED_LEADER_RETRIES_DEFAULT = 3;
-
-	private final String topicId;
-	private final KafkaOffset startingOffset;
-	private transient ConsumerConfig consumerConfig; // ConsumerConfig is not serializable.
-
-	private transient KafkaConsumerIterator iterator;
-	private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSetOperatorState;
-
-	private transient Map<Integer, KafkaOffset> partitionOffsets;
-
-	/**
-	 * Creates a persistent Kafka source that consumes a topic.
-	 * If there is are no new messages on the topic, this consumer will wait
-	 * 100 milliseconds before trying to fetch messages again.
-	 * The consumer will start consuming from the latest messages in the topic.
-	 *
-	 * @param zookeeperAddress
-	 * 		Address of the Zookeeper host (with port number).
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param deserializationSchema
-	 * 		User defined deserialization schema.
-	 */
-	public PersistentKafkaSource(String zookeeperAddress, String topicId,
-			DeserializationSchema<OUT> deserializationSchema) {
-		this(zookeeperAddress, topicId, deserializationSchema, KafkaTopicUtils.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS, 100);
-	}
-
-	/**
-	 * Creates a persistent Kafka source that consumes a topic.
-	 * If there is are no new messages on the topic, this consumer will wait
-	 * waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
-	 * The consumer will start consuming from the latest messages in the topic.
-	 *
-	 * @param zookeeperAddress
-	 * 		Address of the Zookeeper host (with port number).
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param deserializationSchema
-	 * 		User defined deserialization schema.
-	 * @param zookeeperSyncTimeMillis
-	 * 		Synchronization time with zookeeper.
-	 * @param waitOnEmptyFetchMillis
-	 * 		Time to wait before fetching for new message.
-	 */
-	public PersistentKafkaSource(String zookeeperAddress, String topicId,
-			DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis) {
-		this(zookeeperAddress, topicId, deserializationSchema, zookeeperSyncTimeMillis, waitOnEmptyFetchMillis, Offset.FROM_CURRENT);
-	}
-
-
-	/**
-	 * Creates a persistent Kafka source that consumes a topic.
-	 * If there is are no new messages on the topic, this consumer will wait
-	 * waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
-	 *
-	 * THIS CONSTRUCTOR IS DEPRECATED: USE the constructor with the ConsumerConfig.
-	 *
-	 * @param zookeeperAddress
-	 * 		Address of the Zookeeper host (with port number).
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param deserializationSchema
-	 * 		User defined deserialization schema.
-	 * @param zookeeperSyncTimeMillis
-	 * 		Synchronization time with zookeeper.
-	 * @param waitOnEmptyFetchMillis
-	 * 		Time to wait before fetching for new message.
-	 * @param startOffsetType
-	 * 		The offset to start from (beginning or current).
-	 */
-	@Deprecated
-	public PersistentKafkaSource(String zookeeperAddress, String topicId,DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis, Offset startOffsetType) {
-		this(topicId, deserializationSchema, startOffsetType, legacyParametersToConsumerConfig(zookeeperAddress, zookeeperSyncTimeMillis, waitOnEmptyFetchMillis));
-	}
-
-	private static ConsumerConfig legacyParametersToConsumerConfig(String zookeeperAddress, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis) {
-		Properties props = new Properties();
-		props.setProperty("zookeeper.sync.time.ms", Integer.toString(zookeeperSyncTimeMillis));
-		props.setProperty(WAIT_ON_EMPTY_FETCH_KEY, Integer.toString(waitOnEmptyFetchMillis));
-		props.setProperty("zookeeper.connect", zookeeperAddress);
-		props.setProperty("group.id", "flink-persistent-kafka-source");
-		return new ConsumerConfig(props);
-	}
-
-	/**
-	 * Creates a persistent Kafka source that consumes a topic.
-	 * If there is are no new messages on the topic, this consumer will wait
-	 * waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
-	 *
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param deserializationSchema
-	 * 		User defined deserialization schema.
-	 * @param startOffsetType
-	 * 		The offset to start from (beginning or current).
-	 * @param consumerConfig
-	 * 		Additional configuration for the PersistentKafkaSource.
-	 * 		NOTE: This source will only respect certain configuration values from the config!
-	 */
-	public PersistentKafkaSource(String topicId, DeserializationSchema<OUT> deserializationSchema, Offset startOffsetType, ConsumerConfig consumerConfig) {
-		super(deserializationSchema);
-		Preconditions.checkNotNull(topicId, "The topic id can not be null");
-		Preconditions.checkNotNull(deserializationSchema, "The deserialization schema can not be null");
-		Preconditions.checkNotNull(consumerConfig, "ConsumerConfig can not be null");
-
-		this.consumerConfig = consumerConfig;
-
-		this.topicId = topicId;
-
-		switch (startOffsetType) {
-			case FROM_BEGINNING:
-				this.startingOffset = new BeginningOffset();
-				break;
-			case FROM_CURRENT:
-				this.startingOffset = new CurrentOffset();
-				break;
-			default:
-				this.startingOffset = new CurrentOffset();
-				break;
-		}
-	}
-
-	// ---------------------- Source lifecycle methods (open / run / cancel ) -----------------
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void open(Configuration parameters) throws InterruptedException {
-		LOG.info("Starting PersistentKafkaSource");
-		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		int indexOfSubtask = context.getIndexOfThisSubtask();
-		int numberOfSubtasks = context.getNumberOfParallelSubtasks();
-
-		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(consumerConfig.zkConnect(), consumerConfig.zkSyncTimeMs(), consumerConfig.zkConnectionTimeoutMs());
-
-		int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(topicId);
-
-		if (indexOfSubtask >= numberOfPartitions) {
-			LOG.info("Creating idle consumer because this subtask ({}) is higher than the number partitions ({})", indexOfSubtask + 1, numberOfPartitions);
-			iterator = new KafkaIdleConsumerIterator();
-		}
-		else {
-			if (partitionOffsets != null) {
-				// we have restored state
-				LOG.info("Initializing PersistentKafkaSource from existing state.");
-			}
-			else {
-				LOG.info("No existing state found. Creating new");
-				partitionOffsets = new HashMap<Integer, KafkaOffset>();
-
-				for (int partitionIndex = indexOfSubtask; partitionIndex < numberOfPartitions; partitionIndex += numberOfSubtasks) {
-					partitionOffsets.put(partitionIndex, startingOffset);
-				}
-
-				kafkaOffSetOperatorState = new OperatorState<Map<Integer, KafkaOffset>>(partitionOffsets);
-			}
-
-			iterator = new KafkaMultiplePartitionsIterator(topicId, partitionOffsets, kafkaTopicUtils, this.consumerConfig);
-
-			if (LOG.isInfoEnabled()) {
-				LOG.info("PersistentKafkaSource ({}/{}) listening to partitionOffsets {} of topic {}.",
-						indexOfSubtask + 1, numberOfSubtasks, partitionOffsets.keySet(), topicId);
-			}
-		}
-
-		iterator.initialize();
-	}
-
-	@Override
-	public void run(Collector<OUT> collector) throws Exception {
-		MessageWithMetadata msg;
-		while (iterator.hasNext()) {
-			msg = iterator.nextWithOffset();
-			OUT out = schema.deserialize(msg.getMessage());
-
-			if (schema.isEndOfStream(out)) {
-				break;
-			}
-
-			collector.collect(out);
-
-			// TODO avoid object creation
-			partitionOffsets.put(msg.getPartition(), new GivenOffset(msg.getOffset()));
-			kafkaOffSetOperatorState.update(partitionOffsets);
-		}
-	}
-
-	@Override
-	public void cancel() {
-		LOG.info("PersistentKafkaSource has been cancelled");
-	}
-
-
-
-	// ---------------------- (Java)Serialization methods for the consumerConfig -----------------
-
-	private void writeObject(ObjectOutputStream out)
-			throws IOException, ClassNotFoundException {
-		out.defaultWriteObject();
-		out.writeObject(consumerConfig.props().props());
-	}
-
-	private void readObject(ObjectInputStream in)
-			throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		Properties props = (Properties) in.readObject();
-		consumerConfig = new ConsumerConfig(props);
-	}
-
-	@Override
-	public HashMap<Integer, KafkaOffset> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		return new HashMap<Integer, KafkaOffset>(this.partitionOffsets);
-	}
-
-	@Override
-	public void restoreState(HashMap<Integer, KafkaOffset> state) {
-		this.partitionOffsets = state;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
deleted file mode 100644
index 42fe003..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
-
-import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
-
-/**
- * Iterator interface for different types of Kafka consumers.
- */
-public interface KafkaConsumerIterator {
-
-	void initialize() throws InterruptedException;
-
-	boolean hasNext();
-
-	/**
-	 * Returns the next message received from Kafka as a
-	 * byte array.
-	 *
-	 * @return next message as a byte array.
-	 */
-	byte[] next() throws InterruptedException;
-
-	/**
-	 * Returns the next message and its offset received from
-	 * Kafka encapsulated in a POJO.
-	 *
-	 * @return next message and its offset.
-	 */
-	MessageWithMetadata nextWithOffset() throws InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
deleted file mode 100644
index 1935118..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
-
-import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * No-op iterator. Used when more source tasks are available than Kafka partitions
- */
-public class KafkaIdleConsumerIterator implements KafkaConsumerIterator {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaIdleConsumerIterator.class);
-
-	public KafkaIdleConsumerIterator() {
-		if (LOG.isWarnEnabled()) {
-			LOG.warn("Idle Kafka consumer created. The subtask does nothing.");
-		}
-	}
-
-
-	@Override
-	public void initialize() throws InterruptedException {
-
-	}
-
-	@Override
-	public boolean hasNext() {
-		return false;
-	}
-
-	@Override
-	public byte[] next() {
-		throw new RuntimeException("Idle consumer has no input.");
-	}
-
-	@Override
-	public MessageWithMetadata nextWithOffset() {
-		throw new RuntimeException("Idle consumer has no input.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
deleted file mode 100644
index daebaaf..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import kafka.consumer.ConsumerConfig;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
-import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
-import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Iterator over multiple Kafka partitions.
- *
- * This is needed when num partitions > num kafka sources.
- */
-public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaMultiplePartitionsIterator.class);
-
-	protected List<KafkaSinglePartitionIterator> partitions;
-	protected final ConsumerConfig consumerConfig;
-
-	public KafkaMultiplePartitionsIterator(String topic,
-										Map<Integer, KafkaOffset> partitionsWithOffset,
-										KafkaTopicUtils kafkaTopicUtils, ConsumerConfig consumerConfig) {
-		partitions = new ArrayList<KafkaSinglePartitionIterator>(partitionsWithOffset.size());
-
-		this.consumerConfig = consumerConfig;
-
-		for (Map.Entry<Integer, KafkaOffset> partitionWithOffset : partitionsWithOffset.entrySet()) {
-			partitions.add(new KafkaSinglePartitionIterator(
-					topic,
-					partitionWithOffset.getKey(),
-					partitionWithOffset.getValue(),
-					kafkaTopicUtils,
-					this.consumerConfig));
-		}
-	}
-
-	@Override
-	public void initialize() throws InterruptedException {
-		LOG.info("Initializing iterator with {} partitions", partitions.size());
-		String partInfo = "";
-		for (KafkaSinglePartitionIterator partition : partitions) {
-			partition.initialize();
-			partInfo += partition.toString() + " ";
-		}
-		LOG.info("Initialized partitions {}", partInfo);
-	}
-
-	@Override
-	public boolean hasNext() {
-		return true;
-	}
-
-	@Override
-	public byte[] next() throws InterruptedException {
-		return nextWithOffset().getMessage();
-	}
-
-	protected int lastCheckedPartitionIndex = -1;
-	private boolean gotNewMessage = false;
-
-	@Override
-	public MessageWithMetadata nextWithOffset() throws InterruptedException {
-		KafkaSinglePartitionIterator partition;
-
-		while (true) {
-			for (int i = nextPartition(lastCheckedPartitionIndex); i < partitions.size(); i = nextPartition(i)) {
-				partition = partitions.get(i);
-
-				if (partition.fetchHasNext()) {
-					gotNewMessage = true;
-					lastCheckedPartitionIndex = i;
-					return partition.nextWithOffset();
-				}
-			}
-
-			// do not wait if a new message has been fetched
-			if (!gotNewMessage) {
-				try {
-					Thread.sleep(consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_EMPTY_FETCH_KEY), consumerConfig.fetchWaitMaxMs());
-				} catch (InterruptedException e) {
-					LOG.warn("Interrupted while waiting for new messages", e);
-				}
-			}
-
-			gotNewMessage = false;
-		}
-	}
-
-	protected int nextPartition(int currentPartition) {
-		return (currentPartition + 1) % partitions.size();
-	}
-}