You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/03/19 20:59:01 UTC

[3/3] flink git commit: [FLINK-1752][streaming] Add KafkaITCase and various bugfixes

[FLINK-1752][streaming] Add KafkaITCase and various bugfixes

This closes #500


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0917f23
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0917f23
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0917f23

Branch: refs/heads/master
Commit: c0917f23700bdef499ede8cf079aceb37bfe2710
Parents: b18c66b
Author: Robert Metzger <rm...@apache.org>
Authored: Mon Mar 16 17:16:05 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Mar 19 20:58:05 2015 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  46 ++-
 .../java/org/apache/flink/util/NetUtils.java    |  24 ++
 .../java/org/apache/flink/api/java/DataSet.java |   3 +-
 .../librarycache/BlobLibraryCacheManager.java   |  12 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   2 +-
 .../jobgraph/tasks/BarrierTransceiver.java      |   4 +-
 .../flink/runtime/state/LocalStateHandle.java   |  26 +-
 .../apache/flink/runtime/state/StateHandle.java |   2 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   1 +
 .../runtime/taskmanager/TaskExecutionState.java |  80 +++--
 .../StreamCheckpointCoordinator.scala           |   6 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   2 +-
 .../flink-streaming-connectors/pom.xml          |  33 +-
 .../connectors/kafka/api/KafkaSink.java         |  85 ++++--
 .../connectors/kafka/api/KafkaSource.java       |  52 +++-
 .../kafka/api/config/PartitionerWrapper.java    |  49 +++
 .../kafka/api/simple/KafkaTopicUtils.java       |  33 +-
 .../kafka/api/simple/PersistentKafkaSource.java |  84 +++++-
 .../kafka/api/simple/SimpleKafkaSource.java     |  70 -----
 .../KafkaMultiplePartitionsIterator.java        |  25 +-
 .../iterator/KafkaOnePartitionIterator.java     | 300 -------------------
 .../iterator/KafkaSinglePartitionIterator.java  | 300 +++++++++++++++++++
 .../kafka/api/simple/offset/KafkaOffset.java    |   3 +-
 .../kafka/api/simple/offset/Offset.java         |  32 ++
 .../connectors/kafka/config/EncoderWrapper.java |  46 ---
 .../kafka/config/KafkaConfigWrapper.java        |  60 ----
 .../kafka/config/PartitionerWrapper.java        |  47 ---
 .../kafka/config/StringSerializer.java          |  44 ---
 .../partitioner/KafkaConstantPartitioner.java   |   5 +-
 .../partitioner/KafkaDistributePartitioner.java |  41 ---
 .../kafka/partitioner/KafkaPartitioner.java     |  26 --
 .../SerializableKafkaPartitioner.java           |  24 ++
 .../streaming/connectors/kafka/KafkaITCase.java | 252 ++++++++++++++++
 .../connectors/kafka/StringSerializerTest.java  |  69 -----
 .../src/test/resources/log4j-test.properties    |  10 +-
 .../streaming/api/datastream/DataStream.java    |   2 +-
 .../environment/StreamExecutionEnvironment.java |   2 +
 .../api/invokable/operator/co/CoInvokable.java  |   2 +
 .../api/streamvertex/StreamVertex.java          |  10 +-
 .../src/test/resources/log4j.properties         |  10 +-
 40 files changed, 1049 insertions(+), 875 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 302a839..6d62b75 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -29,7 +29,7 @@ Introduction
 ------------
 
 
-Flink Streaming is an extension of the batch Flink API for high-throughput, low-latency data stream processing. The system can connect to and process data streams from many data sources like Apache Kafka RabbitMQ, Apache Flume, Twitter and also from any user defined data source. Data streams can be transformed and modified using high-level functions similar to the ones provided by the batch processing API. Flink Streaming provides native support for iterative stream processing. The processed data can be pushed to different output types.
+Flink Streaming is an extension of the batch Flink API for high-throughput, low-latency data stream processing. The system can connect to and process data streams from many data sources like Apache Kafka, RabbitMQ, Apache Flume, Twitter and also from any user defined data source. Data streams can be transformed and modified using high-level functions similar to the ones provided by the batch processing API. Flink Streaming provides native support for iterative stream processing. The processed data can be pushed to different output types.
 
 Flink Streaming API
 -----------
@@ -1159,16 +1159,16 @@ This connector provides access to data streams from [Apache Kafka](https://kafka
 #### Installing Apache Kafka
 * Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
 * On 32 bit computers [this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in) problem may occur. 
-* If the Kafka zookeeper and server are running on a remote machine then in the config/server.properties file the advertised.host.name must be set to the machine's IP address.
+* If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file the  must be set to the machine's IP address.
 
 #### Kafka Source
-A class providing an interface for receiving data from Kafka.
+The standard `KafkaSource` is a Kafka consumer providing an access to one topic.
 
-The followings have to be provided for the `KafkaSource(…)` constructor in order:
+The following parameters have to be provided for the `KafkaSource(...)` constructor:
 
 1. Zookeeper hostname
 2. The topic name
-3. Deserialisation schema
+3. Deserialization schema
 
 Example:
 
@@ -1190,15 +1190,33 @@ stream = env
 </div>
 
 #### Persistent Kafka Source
-As Kafka persists all their data, a fault tolerant Kafka source can be provided.
+As Kafka persists all the data, a fault tolerant Kafka source can be provided.
 
-The PersistentKafkaSource can read a topic, and if the job fails for some reason, when restarting the source will continue on reading from where it left off. For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages.
+The PersistentKafkaSource can read a topic, and if the job fails for some reason, the source will
+continue on reading from where it left off after a restart.
+For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job
+failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages.
 
-The followings have to be provided for the `PersistentKafkaSource(…)` constructor in order:
+To use fault tolerant Kafka Sources, monitoring of the topology needs to be enabled at the execution environment:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableMonitoring(5000);
+{% endhighlight %}
+</div>
+</div>
+
+Also note that Flink can only restart the topology if enough processing slots are available to restart the topology.
+So if the topology fails due to loss of a TaskManager, there must be still enough slots available afterwards.
+Flink on YARN supports automatic restart of lost YARN containers.
+
+The following arguments have to be provided for the `PersistentKafkaSource(...)` constructor:
 
 1. The topic name
 2. The hostname of a Kafka broker
-3. Deserialisation schema
+3. Deserialization schema
 
 Example:
 
@@ -1245,7 +1263,7 @@ More about Kafka can be found [here](https://kafka.apache.org/documentation.html
 
 ### Apache Flume
 
-This connector provides access to datastreams from [Apache Flume](http://flume.apache.org/).
+This connector provides access to data streams from [Apache Flume](http://flume.apache.org/).
 
 #### Installing Apache Flume
 [Download](http://flume.apache.org/download.html) Apache Flume. A configuration file is required for starting agents in Flume. A configuration file for running the example can be found [here](#config_file).
@@ -1257,7 +1275,7 @@ The followings have to be provided for the `FlumeSource(…)` constructor in ord
 
 1. The hostname
 2. The port number
-3. Deserialisation schema
+3. Deserialization schema
 
 Example:
 
@@ -1337,7 +1355,7 @@ More on Flume can be found [here](http://flume.apache.org).
 
 ### RabbitMQ
 
-This connector provides access to datastreams from [RabbitMQ](http://www.rabbitmq.com/).
+This connector provides access to data streams from [RabbitMQ](http://www.rabbitmq.com/).
 
 ##### Installing RabbitMQ
 Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.com/download.html). After the installation the server automatically starts and the application connecting to RabbitMQ can be launched.
@@ -1350,7 +1368,7 @@ The followings have to be provided for the `RMQSource(…)` constructor in order
 
 1. The hostname
 2. The queue name
-3. Deserialisation schema
+3. Deserialization schema
 
 Example:
 
@@ -1492,7 +1510,7 @@ mvn assembly:assembly
 This creates an assembly jar under *flink-streaming-connectors/target*. 
 
 #### RabbitMQ
-Pull the image:
+Pull the docker image:
 
 ~~~bash
 sudo docker pull flinkstreaming/flink-connectors-rabbitmq 

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index c4538ec..070a650 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -18,6 +18,9 @@
 package org.apache.flink.util;
 
 
+import java.net.MalformedURLException;
+import java.net.URL;
+
 public class NetUtils {
 	
 	/**
@@ -38,4 +41,25 @@ public class NetUtils {
 			return fqdn.substring(0, dotPos);
 		}
 	}
+
+	/**
+	 * Method to validate if the given String represents a hostname:port.
+	 *
+	 * Works also for ipv6.
+	 *
+	 * See: http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
+	 */
+	public static void ensureCorrectHostnamePort(String hostPort) {
+		try {
+			URL u = new URL("http://"+hostPort);
+			if(u.getHost() == null) {
+				throw new IllegalArgumentException("The given host:port ('"+hostPort+"') doesn't contain a valid host");
+			}
+			if(u.getPort() == -1) {
+				throw new IllegalArgumentException("The given host:port ('"+hostPort+"') doesn't contain a valid port");
+			}
+		} catch (MalformedURLException e) {
+			throw new IllegalArgumentException("The given host:port ('"+hostPort+"') is invalid", e);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index a884f6d..b045100 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -183,8 +183,9 @@ public abstract class DataSet<T> {
 	public <F> F clean(F f) {
 		if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
 			ClosureCleaner.clean(f, true);
+		} else {
+			ClosureCleaner.ensureSerializable(f);
 		}
-		ClosureCleaner.ensureSerializable(f);
 		return f;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 4dba50a..680f968 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -127,7 +127,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 					throw new IOException("Library cache could not register the user code libraries.", t);
 				}
 				
-				URLClassLoader classLoader = new URLClassLoader(urls);
+				URLClassLoader classLoader = new FlinkUserCodeClassLoader(urls);
 				cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, classLoader, task));
 			}
 			else {
@@ -304,4 +304,14 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 			return referenceHolders.size();
 		}
 	}
+
+	/**
+	 * Give the URLClassLoader a nicer name for debugging purposes.
+	 */
+	private static class FlinkUserCodeClassLoader extends URLClassLoader {
+
+		public FlinkUserCodeClassLoader(URL[] urls) {
+			super(urls);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 01495f1..46ec445 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -606,7 +606,7 @@ public class ExecutionGraph implements Serializable {
 					attempt.cancelingComplete();
 					return true;
 				case FAILED:
-					attempt.markFailed(state.getError());
+					attempt.markFailed(state.getError(userClassLoader));
 					return true;
 				default:
 					// we mark as failed and return false, which triggers the TaskManager

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
index 0a8642e..a867b57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
@@ -18,6 +18,8 @@
 package org.apache.flink.runtime.jobgraph.tasks;
 
 
+import java.io.IOException;
+
 /**
  * A BarrierTransceiver describes an operator's barrier checkpointing behavior used for 
  * fault tolerance. In the most common case [[broadcastBarrier]] is being expected to be called 
@@ -36,6 +38,6 @@ public interface BarrierTransceiver {
 	 * A callback for confirming that a barrier checkpoint is complete
 	 * @param barrierID
 	 */
-	public void confirmBarrier(long barrierID);
+	public void confirmBarrier(long barrierID) throws IOException;
 	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index ac40bf8..98712b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -18,24 +18,36 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.util.InstantiationUtil;
 
+import java.io.IOException;
 import java.util.Map;
 
 /**
  * A StateHandle that includes a copy of the state itself. This state handle is recommended for 
- * cases where the operatorState is lightweight enough to pass throughout the network. 
- * 
+ * cases where the operatorState is lightweight enough to pass throughout the network.
+ *
+ * State is kept in a byte[] because it may contain userclasses, which akka is not able to handle.
  */
 public class LocalStateHandle implements StateHandle{
 	
-	private final Map<String, OperatorState<?>>  state;
+	transient private Map<String, OperatorState<?>> stateMap;
+	private final byte[] state;
 
-	public LocalStateHandle(Map<String,OperatorState<?>> state) {
-		this.state = state;
+	public LocalStateHandle(Map<String,OperatorState<?>> state) throws IOException {
+		this.stateMap = state;
+		this.state = InstantiationUtil.serializeObject(state);
 	}
 
 	@Override
-	public Map<String,OperatorState<?>> getState() {
-		return state;
+	public Map<String,OperatorState<?>> getState(ClassLoader usercodeClassloader) {
+		if(stateMap == null) {
+			try {
+				stateMap = (Map<String, OperatorState<?>>) InstantiationUtil.deserializeObject(this.state, usercodeClassloader);
+			} catch (Exception e) {
+				throw new RuntimeException("Error while deserializing the state", e);
+			}
+		}
+		return stateMap;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
index ddc8038..1487251 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
@@ -35,6 +35,6 @@ public interface StateHandle extends Serializable{
 	 * 
 	 * @return
 	 */
-	public Map<String,OperatorState<?>> getState();
+	public Map<String,OperatorState<?>> getState(ClassLoader userClassloader);
 	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 8cebc6c..fa2413b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -332,6 +332,7 @@ public class Task {
 	 * Starts the execution of this task.
 	 */
 	public boolean startExecution() {
+		LOG.info("Starting execution of task {}", this.getTaskName());
 		if (STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
 			final Thread thread = this.environment.getExecutingThread();
 			thread.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 101f316..bda7c8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -18,11 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -30,6 +26,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.util.InstantiationUtil;
 
 /**
  * This class represents an update about a task's execution state.
@@ -44,7 +41,9 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 
 	private ExecutionState executionState;
 
-	private Throwable error;
+	private byte[] serializedError;
+
+	private Throwable cachedError;
 
 	
 	public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, ExecutionState executionState) {
@@ -67,11 +66,15 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 		if (jobID == null || executionId == null || executionState == null) {
 			throw new NullPointerException();
 		}
-		
+
 		this.jobID = jobID;
 		this.executionId = executionId;
 		this.executionState = executionState;
-		this.error = error;
+		try {
+			this.serializedError = InstantiationUtil.serializeObject(error);
+		} catch (IOException e) {
+			throw new RuntimeException("Error while serializing task exception", e);
+		}
 	}
 
 	/**
@@ -83,9 +86,22 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
-	public Throwable getError() {
-		return this.error;
+
+	/**
+	 * We deserialize the error with the usercode classloader.
+	 */
+	public Throwable getError(ClassLoader usercodeClassloader) {
+		if(this.serializedError == null) {
+			return null;
+		}
+		if(this.cachedError == null) {
+			try {
+				cachedError = (Throwable) InstantiationUtil.deserializeObject(this.serializedError, usercodeClassloader);
+			} catch (Exception e) {
+				throw new RuntimeException("Error while deserializing failure exception", e);
+			}
+		}
+		return this.cachedError;
 	}
 
 	/**
@@ -122,23 +138,12 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 		this.jobID.read(in);
 		this.executionId.read(in);
 		this.executionState = ExecutionState.values()[in.readInt()];
-
-		// read the exception
-		int errorDataLen = in.readInt();
-		if (errorDataLen > 0) {
-			byte[] data = new byte[errorDataLen];
-			in.readFully(data);
-			try {
-				ByteArrayInputStream bis = new ByteArrayInputStream(data);
-				ObjectInputStream ois = new ObjectInputStream(bis);
-				this.error = (Throwable) ois.readObject();
-				ois.close();
-			} catch (Throwable t) {
-				this.error = new Exception("An error occurred, but the exception could not be transfered through the RPC");
-			}
-		}
-		else {
-			this.error = null;
+		int len = in.readInt();
+		if(len == -1) {
+			this.serializedError = null;
+		} else {
+			this.serializedError = new byte[len];
+			in.read(this.serializedError);
 		}
 	}
 
@@ -149,19 +154,12 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 		out.writeInt(this.executionState.ordinal());
 
 		// transfer the exception
-		if (this.error != null) {
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			ObjectOutputStream oos = new ObjectOutputStream(baos);
-			oos.writeObject(error);
-			oos.flush();
-			oos.close();
-			
-			byte[] data = baos.toByteArray();
-			out.writeInt(data.length);
-			out.write(data);
+		if (this.serializedError == null) {
+			out.writeInt(-1);
 		}
 		else {
-			out.writeInt(0);
+			out.writeInt(this.serializedError.length);
+			out.write(this.serializedError);
 		}
 	}
 	
@@ -174,8 +172,8 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 			return other.jobID.equals(this.jobID) &&
 					other.executionId.equals(this.executionId) &&
 					other.executionState == this.executionState &&
-					(other.error == null ? this.error == null :
-						(this.error != null && other.error.getClass() == this.error.getClass()));
+					(other.cachedError == null ? this.cachedError == null :
+						(this.cachedError != null && other.cachedError.getClass() == this.cachedError.getClass()));
 			
 			// NOTE: exception equality does not work, so we can only check for same error class
 		}
@@ -192,6 +190,6 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 	@Override
 	public String toString() {
 		return String.format("TaskState jobId=%s, executionId=%s, state=%s, error=%s", 
-				jobID, executionId, executionState, error == null ? "(null)" : error.getClass().getName() + ": " + error.getMessage());
+				jobID, executionId, executionState, cachedError == null ? "(null)" : cachedError.getClass().getName() + ": " + cachedError.getMessage());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
index fee69b5..b6b2cc1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -75,17 +75,17 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
     case InitBarrierScheduler =>
       context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
       context.system.scheduler.schedule(2 * interval,2 * interval,self,CompactAndUpdate)
-      log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}",
+      log.info("Started Stream State Monitor for job {}{}",
         executionGraph.getJobID,executionGraph.getJobName)
       
     case BarrierTimeout =>
       executionGraph.getState match {
         case FAILED | CANCELED | FINISHED =>
-          log.debug("[FT-MONITOR] Stopping monitor for terminated job {}", executionGraph.getJobID)
+          log.info("Stopping monitor for terminated job {}", executionGraph.getJobID)
           self ! PoisonPill
         case _ =>
           curId += 1
-          log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName)
+          log.debug("Sending Barrier to vertices of Job " + executionGraph.getJobName)
           vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex &&
                   v.getExecutionState == RUNNING).foreach(vertex
           => vertex.getCurrentAssignedResource.getInstance.getTaskManager

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 086c2bd..067ffe8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -451,7 +451,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
       // register the task with the network stack and profiles
       networkEnvironment match {
         case Some(ne) =>
-          log.debug("Register task {} on {}.", task, connectionInfo)
+          log.info("Register task {} on {}.", task, connectionInfo)
           ne.registerTask(task)
         case None => throw new RuntimeException(
           "Network environment has not been properly instantiated.")

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
index 1a62234..06d34c7 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
@@ -35,6 +35,13 @@ under the License.
 
 	<packaging>jar</packaging>
 
+	<!-- Allow users to pass custom kafka versions -->
+	<properties>
+		<kafka.version>0.8.1</kafka.version>
+		<rabbitmq.version>3.3.1</rabbitmq.version>
+		<flume-ng.version>1.5.0</flume-ng.version>
+	</properties>
+
 	<dependencies>
 
 		<dependency>
@@ -46,9 +53,13 @@ under the License.
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka_2.10</artifactId>
-			<version>0.8.1</version>
+			<version>${kafka.version}</version>
 			<exclusions>
 				<exclusion>
+					<groupId>org.apache.zookeeper</groupId>
+					<artifactId>zookeeper</artifactId>
+				</exclusion>
+				<exclusion>
 					<groupId>com.sun.jmx</groupId>
 					<artifactId>jmxri</artifactId>
 				</exclusion>
@@ -90,13 +101,13 @@ under the License.
 		<dependency>
 			<groupId>com.rabbitmq</groupId>
 			<artifactId>amqp-client</artifactId>
-			<version>3.3.1</version>
+			<version>${rabbitmq.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flume</groupId>
 			<artifactId>flume-ng-core</artifactId>
-			<version>1.5.0</version>
+			<version>${flume-ng.version}</version>
 			<exclusions>
 				<exclusion>
 					<groupId>org.slf4j</groupId>
@@ -202,13 +213,27 @@ under the License.
 			<artifactId>spymemcached</artifactId>
 			<version>2.8.4</version>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>org.apache.sling</groupId>
 			<artifactId>org.apache.sling.commons.json</artifactId>
 			<version>2.0.6</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>0.9-SNAPSHOT</version>
+		</dependency>
+
+		<!-- Curator provides a Zookeeper test cluster, needed to test Kafka -->
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+			<version>2.5.0</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 0b30f6e..1753561 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -19,17 +19,22 @@ package org.apache.flink.streaming.connectors.kafka.api;
 
 import java.util.Properties;
 
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kafka.config.EncoderWrapper;
-import org.apache.flink.streaming.connectors.kafka.config.PartitionerWrapper;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaDistributePartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
+import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
+import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
 import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.util.NetUtils;
+
+import com.google.common.base.Preconditions;
 
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 import kafka.serializer.DefaultEncoder;
+import kafka.serializer.StringEncoder;
 
 /**
  * Sink that emits its inputs to a Kafka topic.
@@ -38,38 +43,39 @@ import kafka.serializer.DefaultEncoder;
  * 		Type of the sink input
  */
 public class KafkaSink<IN> extends RichSinkFunction<IN> {
+
 	private static final long serialVersionUID = 1L;
 
 	private Producer<IN, byte[]> producer;
 	private Properties props;
 	private String topicId;
-	private String brokerAddr;
-	private boolean initDone = false;
+	private String zookeeperAddress;
 	private SerializationSchema<IN, byte[]> scheme;
-	private KafkaPartitioner<IN> partitioner;
+	private SerializableKafkaPartitioner partitioner;
+	private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
 
 	/**
 	 * Creates a KafkaSink for a given topic. The partitioner distributes the
 	 * messages between the partitions of the topics.
 	 *
-	 * @param brokerAddr
-	 * 		Address of the Kafka broker (with port number).
+	 * @param zookeeperAddress
+	 * 		Address of the Zookeeper host (with port number).
 	 * @param topicId
 	 * 		ID of the Kafka topic.
 	 * @param serializationSchema
 	 * 		User defined serialization schema.
 	 */
-	public KafkaSink(String brokerAddr, String topicId,
+	public KafkaSink(String zookeeperAddress, String topicId,
 			SerializationSchema<IN, byte[]> serializationSchema) {
-		this(brokerAddr, topicId, serializationSchema, new KafkaDistributePartitioner<IN>());
+		this(zookeeperAddress, topicId, serializationSchema, (Class) null);
 	}
 
 	/**
 	 * Creates a KafkaSink for a given topic. The sink produces its input into
 	 * the topic.
 	 *
-	 * @param brokerAddr
-	 * 		Address of the Kafka broker (with port number).
+	 * @param zookeeperAddress
+	 * 		Address of the Zookeeper host (with port number).
 	 * @param topicId
 	 * 		ID of the Kafka topic.
 	 * @param serializationSchema
@@ -77,42 +83,63 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	 * @param partitioner
 	 * 		User defined partitioner.
 	 */
-	public KafkaSink(String brokerAddr, String topicId,
-			SerializationSchema<IN, byte[]> serializationSchema, KafkaPartitioner<IN> partitioner) {
+	public KafkaSink(String zookeeperAddress, String topicId,
+			SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
+		NetUtils.ensureCorrectHostnamePort(zookeeperAddress);
+		Preconditions.checkNotNull(topicId, "TopicID not set");
+		ClosureCleaner.ensureSerializable(partitioner);
+
+		this.zookeeperAddress = zookeeperAddress;
 		this.topicId = topicId;
-		this.brokerAddr = brokerAddr;
 		this.scheme = serializationSchema;
 		this.partitioner = partitioner;
 	}
 
+	public KafkaSink(String zookeeperAddress, String topicId,
+			SerializationSchema<IN, byte[]> serializationSchema, Class<? extends SerializableKafkaPartitioner> partitioner) {
+		NetUtils.ensureCorrectHostnamePort(zookeeperAddress);
+		Preconditions.checkNotNull(topicId, "TopicID not set");
+		ClosureCleaner.ensureSerializable(partitioner);
+
+		this.zookeeperAddress = zookeeperAddress;
+		this.topicId = topicId;
+		this.scheme = serializationSchema;
+		this.partitionerClass = partitioner;
+	}
+
 	/**
 	 * Initializes the connection to Kafka.
 	 */
-	public void initialize() {
+	@Override
+	public void open(Configuration configuration) {
+
+		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperAddress);
+		String brokerAddress = kafkaTopicUtils.getLeaderBrokerAddressForTopic(topicId);
 
 		props = new Properties();
 
-		props.put("metadata.broker.list", brokerAddr);
+		props.put("metadata.broker.list", brokerAddress);
 		props.put("request.required.acks", "1");
 
 		props.put("serializer.class", DefaultEncoder.class.getCanonicalName());
-		props.put("key.serializer.class", EncoderWrapper.class.getCanonicalName());
-		props.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
+		props.put("key.serializer.class", StringEncoder.class.getCanonicalName());
 
-		EncoderWrapper<IN> encoderWrapper = new EncoderWrapper<IN>(scheme);
-		encoderWrapper.write(props);
-
-		PartitionerWrapper<IN> partitionerWrapper = new PartitionerWrapper<IN>(partitioner);
-		partitionerWrapper.write(props);
+		if (partitioner != null) {
+			props.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
+			// java serialization will do the rest.
+			props.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
+		}
+		if (partitionerClass != null) {
+			props.put("partitioner.class", partitionerClass);
+		}
 
 		ProducerConfig config = new ProducerConfig(props);
 
 		try {
 			producer = new Producer<IN, byte[]>(config);
 		} catch (NullPointerException e) {
-			throw new RuntimeException("Cannot connect to Kafka broker " + brokerAddr);
+			throw new RuntimeException("Cannot connect to Kafka broker " + brokerAddress, e);
 		}
-		initDone = true;
 	}
 
 	/**
@@ -123,10 +150,6 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	 */
 	@Override
 	public void invoke(IN next) {
-		if (!initDone) {
-			initialize();
-		}
-
 		byte[] serialized = scheme.serialize(next);
 		producer.send(new KeyedMessage<IN, byte[]>(topicId, next, serialized));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 5fae3b6..4eff870 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -33,6 +33,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.ConnectorSource;
 import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Source that listens to a Kafka topic using the high level Kafka API.
@@ -41,9 +43,12 @@ import org.apache.flink.util.Collector;
  *            Type of the messages on the topic.
  */
 public class KafkaSource<OUT> extends ConnectorSource<OUT> {
+
 	private static final long serialVersionUID = 1L;
 
-	private final String zookeeperHost;
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
+
+	private final String zookeeperAddress;
 	private final String groupId;
 	private final String topicId;
 
@@ -59,33 +64,52 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	/**
 	 * Creates a KafkaSource that consumes a topic.
 	 * 
-	 * @param zookeeperHost
+	 * @param zookeeperAddress
 	 *            Address of the Zookeeper host (with port number).
 	 * @param topicId
 	 *            ID of the Kafka topic.
+	 * @param groupId
+	 * 			   ID of the consumer group.
 	 * @param deserializationSchema
 	 *            User defined deserialization schema.
 	 * @param zookeeperSyncTimeMillis
 	 *            Synchronization time with zookeeper.
 	 */
-	public KafkaSource(String zookeeperHost, String topicId, String groupId,
-			DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
+	public KafkaSource(String zookeeperAddress, String topicId, String groupId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
 		super(deserializationSchema);
-		this.zookeeperHost = zookeeperHost;
+		this.zookeeperAddress = zookeeperAddress;
 		this.groupId = groupId;
 		this.topicId = topicId;
 		this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
 	}
 
-	public KafkaSource(String zookeeperHost, String topicId,
-			DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
-		this(zookeeperHost, topicId, DEFAULT_GROUP_ID, deserializationSchema,
-				ZOOKEEPER_DEFAULT_SYNC_TIME);
+	/**
+	 * Creates a KafkaSource that consumes a topic.
+	 *
+	 * @param zookeeperAddress
+	 *            Address of the Zookeeper host (with port number).
+	 * @param topicId
+	 *            ID of the Kafka topic.
+	 * @param deserializationSchema
+	 *            User defined deserialization schema.
+	 * @param zookeeperSyncTimeMillis
+	 *            Synchronization time with zookeeper.
+	 */
+	public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
+		this(zookeeperAddress, topicId, DEFAULT_GROUP_ID, deserializationSchema, zookeeperSyncTimeMillis);
 	}
-
-	public KafkaSource(String zookeeperHost, String topicId,
-			DeserializationSchema<OUT> deserializationSchema) {
-		this(zookeeperHost, topicId, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
+	/**
+	 * Creates a KafkaSource that consumes a topic.
+	 *
+	 * @param zookeeperAddress
+	 *            Address of the Zookeeper host (with port number).
+	 * @param topicId
+	 *            ID of the Kafka topic.
+	 * @param deserializationSchema
+	 *            User defined deserialization schema.
+	 */
+	public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema) {
+		this(zookeeperAddress, topicId, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
 	}
 
 	/**
@@ -93,7 +117,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	 */
 	private void initializeConnection() {
 		Properties props = new Properties();
-		props.put("zookeeper.connect", zookeeperHost);
+		props.put("zookeeper.connect", zookeeperAddress);
 		props.put("group.id", groupId);
 		props.put("zookeeper.session.timeout.ms", "10000");
 		props.put("zookeeper.sync.time.ms", Long.toString(zookeeperSyncTimeMillis));

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
new file mode 100644
index 0000000..f9dd21f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.api.config;
+
+import kafka.producer.Partitioner;
+import kafka.utils.VerifiableProperties;
+
+/**
+ * Hacky wrapper to send an object instance through a Properties - map.
+ *
+ * This works as follows:
+ * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
+ *
+ * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
+ * This is set in the key-value (java.util.Properties) map.
+ * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
+ * This is a hack because the put() method is called on the underlying Hashmap.
+ *
+ * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
+ *
+ * The serialziable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
+ */
+public class PartitionerWrapper implements Partitioner {
+	public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
+
+	private Partitioner wrapped;
+	public PartitionerWrapper(VerifiableProperties properties) {
+		wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
+	}
+
+	@Override
+	public int partition(Object value, int numberOfPartitions) {
+		return wrapped.partition(value, numberOfPartitions);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
index 616b279..7f5360b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
@@ -24,6 +24,8 @@ import java.util.Properties;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import kafka.admin.AdminUtils;
 import kafka.api.PartitionMetadata;
@@ -38,32 +40,33 @@ import scala.collection.Seq;
  */
 public class KafkaTopicUtils {
 
-	public static void main(String[] args) {
-		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils("localhost:2181", 5000, 5000);
-//		TopicMetadata para4 = kafkaTopicUtils.getTopicInfo("para4");
-//		PartitionMetadata next = JavaConversions.asJavaCollection(para4.partitionsMetadata()).iterator().next();
-//		next.
-		System.out.println(kafkaTopicUtils.getLeaderBrokerAddressForTopic("para4"));
-	}
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtils.class);
 
 	private final ZkClient zkClient;
 
+	public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS = 10000;
+	public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10000;
+
 	public KafkaTopicUtils(String zookeeperServer) {
-		this(zookeeperServer, 10000, 10000);
+		this(zookeeperServer, DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
 	}
 
-	public KafkaTopicUtils(String zookeeperServer, int sessionTimeoutMs, int connectionTimeoutMs) {
-		zkClient = new ZkClient(zookeeperServer, sessionTimeoutMs, connectionTimeoutMs,
+	public KafkaTopicUtils(String zookeeperAddress, int sessionTimeoutMs, int connectionTimeoutMs) {
+		zkClient = new ZkClient(zookeeperAddress, sessionTimeoutMs, connectionTimeoutMs,
 				new KafkaZKStringSerializer());
+		zkClient.waitUntilConnected();
 	}
 
 	public void createTopic(String topicName, int numOfPartitions, int replicationFactor) {
-		createTopic(topicName, numOfPartitions, replicationFactor, new Properties());
-	}
-
-	public void createTopic(String topicName, int numOfPartitions, int replicationFactor, Properties topicProperties) {
+		LOG.info("Creating Kafka topic '{}'", topicName);
 		Properties topicConfig = new Properties();
-		AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig);
+		if (topicExists(topicName)) {
+			if (LOG.isWarnEnabled()) {
+				LOG.warn("Kafka topic \"{}\" already exists. Returning without action.", topicName);
+			}
+		} else {
+			AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig);
+		}
 	}
 
 	public int getNumberOfPartitions(String topicName) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index a841e11..8ec298a 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -23,12 +23,15 @@ import java.util.Map;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.ConnectorSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator;
 import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaIdleConsumerIterator;
 import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaMultiplePartitionsIterator;
+import org.apache.flink.streaming.connectors.kafka.api.simple.offset.BeginningOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
+import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
 import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
@@ -42,39 +45,48 @@ import org.slf4j.LoggerFactory;
  * @param <OUT>
  * 		Type of the messages on the topic.
  */
-public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
+public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
 
-	protected transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSet;
-	protected transient Map<Integer, KafkaOffset> partitions;
+	private final String topicId;
+	private final String zookeeperServerAddress;
+	private final int zookeeperSyncTimeMillis;
+	private final int waitOnEmptyFetchMillis;
+	private final KafkaOffset startingOffset;
 
-	private int partition;
+	private transient KafkaConsumerIterator iterator;
+	private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSet;
 
-	private int zookeeperSyncTimeMillis;
-	private int waitOnEmptyFetchMillis;
+	private transient Map<Integer, KafkaOffset> partitions;
 
 	/**
 	 * Creates a persistent Kafka source that consumes a topic.
+	 * If there is are no new messages on the topic, this consumer will wait
+	 * 100 milliseconds before trying to fetch messages again.
+	 * The consumer will start consuming from the latest messages in the topic.
 	 *
-	 * @param zookeeperHost
+	 * @param zookeeperAddress
 	 * 		Address of the Zookeeper host (with port number).
 	 * @param topicId
 	 * 		ID of the Kafka topic.
 	 * @param deserializationSchema
 	 * 		User defined deserialization schema.
 	 */
-	public PersistentKafkaSource(String zookeeperHost, String topicId,
+	public PersistentKafkaSource(String zookeeperAddress, String topicId,
 			DeserializationSchema<OUT> deserializationSchema) {
-		this(zookeeperHost, topicId, deserializationSchema, 5000, 500);
+		this(zookeeperAddress, topicId, deserializationSchema, KafkaTopicUtils.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS, 100);
 	}
 
 	/**
 	 * Creates a persistent Kafka source that consumes a topic.
+	 * If there is are no new messages on the topic, this consumer will wait
+	 * waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
+	 * The consumer will start consuming from the latest messages in the topic.
 	 *
-	 * @param zookeeperHost
+	 * @param zookeeperAddress
 	 * 		Address of the Zookeeper host (with port number).
 	 * @param topicId
 	 * 		ID of the Kafka topic.
@@ -85,9 +97,49 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
 	 * @param waitOnEmptyFetchMillis
 	 * 		Time to wait before fetching for new message.
 	 */
-	public PersistentKafkaSource(String zookeeperHost, String topicId,
+	public PersistentKafkaSource(String zookeeperAddress, String topicId,
 			DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis) {
-		super(topicId, zookeeperHost, deserializationSchema);
+		this(zookeeperAddress, topicId, deserializationSchema, zookeeperSyncTimeMillis, waitOnEmptyFetchMillis, Offset.FROM_CURRENT);
+	}
+
+	/**
+	 * Creates a persistent Kafka source that consumes a topic.
+	 * If there is are no new messages on the topic, this consumer will wait
+	 * waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
+	 *
+	 * @param zookeeperAddress
+	 * 		Address of the Zookeeper host (with port number).
+	 * @param topicId
+	 * 		ID of the Kafka topic.
+	 * @param deserializationSchema
+	 * 		User defined deserialization schema.
+	 * @param zookeeperSyncTimeMillis
+	 * 		Synchronization time with zookeeper.
+	 * @param waitOnEmptyFetchMillis
+	 * 		Time to wait before fetching for new message.
+	 * @param startOffsetType
+	 * 		The offset to start from (beginning or current).
+	 */
+	public PersistentKafkaSource(String zookeeperAddress, String topicId,
+			DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis,
+			int waitOnEmptyFetchMillis, Offset startOffsetType) {
+		super(deserializationSchema);
+
+		this.topicId = topicId;
+		this.zookeeperServerAddress = zookeeperAddress;
+
+		switch (startOffsetType) {
+			case FROM_BEGINNING:
+				this.startingOffset = new BeginningOffset();
+				break;
+			case FROM_CURRENT:
+				this.startingOffset = new CurrentOffset();
+				break;
+			default:
+				this.startingOffset = new CurrentOffset();
+				break;
+		}
+
 		this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
 		this.waitOnEmptyFetchMillis = waitOnEmptyFetchMillis;
 	}
@@ -116,10 +168,8 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
 			} else {
 				partitions = new HashMap<Integer, KafkaOffset>();
 
-				partition = indexOfSubtask;
-
 				for (int partitionIndex = indexOfSubtask; partitionIndex < numberOfPartitions; partitionIndex += numberOfSubtasks) {
-					partitions.put(partitionIndex, new CurrentOffset());
+					partitions.put(partitionIndex, startingOffset);
 				}
 
 				kafkaOffSet = new OperatorState<Map<Integer, KafkaOffset>>(partitions);
@@ -160,4 +210,8 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
 			kafkaOffSet.update(partitions);
 		}
 	}
+
+	@Override
+	public void cancel() {
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
deleted file mode 100644
index f972e51..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
-import org.apache.flink.util.Collector;
-
-/**
- * Source that listens to a Kafka topic using the low level or simple Kafka API.
- *
- * @param <OUT>
- *            Type of the messages on the topic.
- */
-public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
-	private static final long serialVersionUID = 1L;
-
-	protected String topicId;
-	protected final String zookeeperServerAddress;
-	protected KafkaConsumerIterator iterator;
-
-	public SimpleKafkaSource(String topic, String zookeeperServerAddress,
-			DeserializationSchema<OUT> deserializationSchema) {
-		super(deserializationSchema);
-		this.topicId = topic;
-		this.zookeeperServerAddress = zookeeperServerAddress;
-	}
-
-	protected void setInitialOffset(Configuration config) throws InterruptedException {
-		iterator.initialize();
-	}
-
-	@Override
-	public void run(Collector<OUT> collector) throws Exception {
-		while (iterator.hasNext()) {
-			MessageWithMetadata msg = iterator.nextWithOffset();
-			OUT out = schema.deserialize(msg.getMessage());
-			collector.collect(out);
-		}
-	}
-
-	@Override
-	public void cancel() {
-	}
-
-
-	@Override
-	public void open(Configuration config) throws InterruptedException {
-//		initializeConnection();
-		setInitialOffset(config);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
index b1c258b..420c6db 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
@@ -30,11 +30,11 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaMultiplePartitionsIterator.class);
 
-	protected List<KafkaOnePartitionIterator> partitions;
+	protected List<KafkaSinglePartitionIterator> partitions;
 	protected final int waitOnEmptyFetch;
 
 	public KafkaMultiplePartitionsIterator(String hostName, String topic, Map<Integer, KafkaOffset> partitionsWithOffset, int waitOnEmptyFetch) {
-		partitions = new ArrayList<KafkaOnePartitionIterator>(partitionsWithOffset.size());
+		partitions = new ArrayList<KafkaSinglePartitionIterator>(partitionsWithOffset.size());
 
 		String[] hostAndPort = hostName.split(":");
 
@@ -44,7 +44,7 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 		this.waitOnEmptyFetch = waitOnEmptyFetch;
 
 		for (Map.Entry<Integer, KafkaOffset> partitionWithOffset : partitionsWithOffset.entrySet()) {
-			partitions.add(new KafkaOnePartitionIterator(
+			partitions.add(new KafkaSinglePartitionIterator(
 					host,
 					port,
 					topic,
@@ -55,7 +55,7 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 
 	@Override
 	public void initialize() throws InterruptedException {
-		for (KafkaOnePartitionIterator partition : partitions) {
+		for (KafkaSinglePartitionIterator partition : partitions) {
 			partition.initialize();
 		}
 	}
@@ -71,26 +71,33 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 	}
 
 	protected int lastCheckedPartitionIndex = -1;
+	private boolean gotNewMessage = false;
 
 	@Override
 	public MessageWithMetadata nextWithOffset() throws InterruptedException {
-		KafkaOnePartitionIterator partition;
+		KafkaSinglePartitionIterator partition;
 
 		while (true) {
 			for (int i = nextPartition(lastCheckedPartitionIndex); i < partitions.size(); i = nextPartition(i)) {
 				partition = partitions.get(i);
 
 				if (partition.fetchHasNext()) {
+					gotNewMessage = true;
 					lastCheckedPartitionIndex = i;
 					return partition.nextWithOffset();
 				}
 			}
 
-			try {
-				Thread.sleep(waitOnEmptyFetch);
-			} catch (InterruptedException e) {
-				e.printStackTrace();
+			// do not wait if a new message has been fetched
+			if (!gotNewMessage) {
+				try {
+					Thread.sleep(waitOnEmptyFetch);
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted while waiting for new messages", e);
+				}
 			}
+
+			gotNewMessage = false;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaOnePartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaOnePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaOnePartitionIterator.java
deleted file mode 100644
index ee6210a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaOnePartitionIterator.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.common.ErrorMapping;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.message.MessageAndOffset;
-
-/**
- * Iterates the records received from a partition of a Kafka topic as byte arrays.
- */
-public class KafkaOnePartitionIterator implements KafkaConsumerIterator, Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaOnePartitionIterator.class);
-
-	private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 10000L;
-
-	private List<String> hosts;
-	private String topic;
-	private int port;
-	private int partition;
-	private long readOffset;
-	private transient SimpleConsumer consumer;
-	private List<String> replicaBrokers;
-	private String clientName;
-	private String leadBroker;
-
-	private KafkaOffset initialOffset;
-	private transient Iterator<MessageAndOffset> iter;
-	private transient FetchResponse fetchResponse;
-
-	/**
-	 * Constructor with configurable wait time on empty fetch. For connecting to the Kafka service
-	 * we use the so called simple or low level Kafka API thus directly connecting to one of the brokers.
-	 *
-	 * @param hostName Hostname of a known Kafka broker
-	 * @param port Port of the known Kafka broker
-	 * @param topic Name of the topic to listen to
-	 * @param partition Partition in the chosen topic
-	 */
-	public KafkaOnePartitionIterator(String hostName, int port, String topic, int partition, KafkaOffset initialOffset) {
-		this.hosts = new ArrayList<String>();
-		hosts.add(hostName);
-		this.port = port;
-
-		this.topic = topic;
-		this.partition = partition;
-
-		this.initialOffset = initialOffset;
-
-		replicaBrokers = new ArrayList<String>();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Initializing a connection
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Initializes the connection by detecting the leading broker of
-	 * the topic and establishing a connection to it.
-	 */
-	public void initialize() throws InterruptedException {
-		PartitionMetadata metadata;
-		do {
-			metadata = findLeader(hosts, port, topic, partition);
-			try {
-				Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
-			} catch (InterruptedException e) {
-				throw new InterruptedException("Establishing connection to Kafka failed");
-			}
-		} while (metadata == null);
-
-		if (metadata.leader() == null) {
-			throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts.get(0)
-					+ ":" + port);
-		}
-
-		leadBroker = metadata.leader().host();
-		clientName = "Client_" + topic + "_" + partition;
-
-		consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
-
-		readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
-
-		resetFetchResponse(readOffset);
-	}
-
-	/**
-	 * Sets the partition to read from.
-	 *
-	 * @param partition
-	 * 		partition number
-	 */
-	public void setPartition(int partition) {
-		this.partition = partition;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Iterator methods
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Convenience method to emulate iterator behaviour.
-	 *
-	 * @return whether the iterator has a next element
-	 */
-	public boolean hasNext() {
-		return true;
-	}
-
-	/**
-	 * Returns the next message received from Kafka as a
-	 * byte array.
-	 *
-	 * @return next message as a byte array.
-	 */
-	public byte[] next() throws InterruptedException {
-		return nextWithOffset().getMessage();
-	}
-
-	public boolean fetchHasNext() throws InterruptedException {
-		synchronized (fetchResponse) {
-			if (!iter.hasNext()) {
-				resetFetchResponse(readOffset);
-				return iter.hasNext();
-			} else {
-				return true;
-			}
-		}
-	}
-
-	/**
-	 * Returns the next message and its offset received from
-	 * Kafka encapsulated in a POJO.
-	 *
-	 * @return next message and its offset.
-	 */
-	public MessageWithMetadata nextWithOffset() throws InterruptedException {
-
-		synchronized (fetchResponse) {
-			if (!iter.hasNext()) {
-				throw new RuntimeException(
-						"Trying to read when response is not fetched. Call fetchHasNext() first.");
-			}
-
-			MessageAndOffset messageAndOffset = iter.next();
-			long currentOffset = messageAndOffset.offset();
-
-			while (currentOffset < readOffset) {
-				messageAndOffset = iter.next();
-				currentOffset = messageAndOffset.offset();
-			}
-
-			readOffset = messageAndOffset.nextOffset();
-			ByteBuffer payload = messageAndOffset.message().payload();
-
-			byte[] bytes = new byte[payload.limit()];
-			payload.get(bytes);
-
-			return new MessageWithMetadata(messageAndOffset.offset(), bytes, partition);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Internal utilities
-	// --------------------------------------------------------------------------------------------
-
-	private void resetFetchResponse(long offset) throws InterruptedException {
-		FetchRequest req = new FetchRequestBuilder().clientId(clientName)
-				.addFetch(topic, partition, offset, 100000).build();
-
-		fetchResponse = consumer.fetch(req);
-
-		if (fetchResponse.hasError()) {
-			short code = fetchResponse.errorCode(topic, partition);
-
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
-			}
-
-			if (code == ErrorMapping.OffsetOutOfRangeCode()) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Asked for invalid offset {}, setting the offset to the latest.", offset);
-				}
-
-				readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName);
-			}
-			consumer.close();
-			consumer = null;
-			leadBroker = findNewLeader(leadBroker, topic, partition, port);
-		}
-
-		iter = fetchResponse.messageSet(topic, partition).iterator();
-	}
-
-	private PartitionMetadata findLeader(List<String> a_hosts, int a_port, String a_topic,
-			int a_partition) {
-		PartitionMetadata returnMetaData = null;
-		loop:
-		for (String seed : a_hosts) {
-			SimpleConsumer consumer = null;
-			try {
-				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
-				List<String> topics = Collections.singletonList(a_topic);
-				TopicMetadataRequest req = new TopicMetadataRequest(topics);
-				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
-
-				List<TopicMetadata> metaData = resp.topicsMetadata();
-				for (TopicMetadata item : metaData) {
-					for (PartitionMetadata part : item.partitionsMetadata()) {
-						if (part.partitionId() == a_partition) {
-							returnMetaData = part;
-							break loop;
-						}
-					}
-				}
-			} catch (Exception e) {
-				throw new RuntimeException("Error communicating with Broker [" + seed
-						+ "] to find Leader for [" + a_topic + ", " + a_partition + "]", e);
-			} finally {
-				if (consumer != null) {
-					consumer.close();
-				}
-			}
-		}
-		if (returnMetaData != null) {
-			replicaBrokers.clear();
-			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
-				replicaBrokers.add(replica.host());
-			}
-		}
-		return returnMetaData;
-	}
-
-	private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws InterruptedException {
-		for (int i = 0; i < 3; i++) {
-			if (LOG.isInfoEnabled()) {
-				LOG.info("Trying to find a new leader after Broker failure.");
-			}
-			boolean goToSleep = false;
-			PartitionMetadata metadata = findLeader(replicaBrokers, a_port, a_topic, a_partition);
-			if (metadata == null) {
-				goToSleep = true;
-			} else if (metadata.leader() == null) {
-				goToSleep = true;
-			} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
-				// first time through if the leader hasn't changed give ZooKeeper a second to recover
-				// second time, assume the broker did recover before failover, or it was a non-Broker issue
-				//
-				goToSleep = true;
-			} else {
-				return metadata.leader().host();
-			}
-			if (goToSleep) {
-				try {
-					Thread.sleep(10000);
-				} catch (InterruptedException ie) {
-				}
-			}
-		}
-		throw new InterruptedException("Unable to find new leader after Broker failure.");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
new file mode 100644
index 0000000..4c1468a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
+import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
+import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.common.ErrorMapping;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+
+/**
+ * Iterates the records received from a partition of a Kafka topic as byte arrays.
+ */
+public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePartitionIterator.class);
+
+	private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 10000L;
+
+	private List<String> hosts;
+	private String topic;
+	private int port;
+	private int partition;
+	private long readOffset;
+	private transient SimpleConsumer consumer;
+	private List<String> replicaBrokers;
+	private String clientName;
+	private String leadBroker;
+
+	private KafkaOffset initialOffset;
+	private transient Iterator<MessageAndOffset> iter;
+	private transient FetchResponse fetchResponse;
+
+	/**
+	 * Constructor with configurable wait time on empty fetch. For connecting to the Kafka service
+	 * we use the so called simple or low level Kafka API thus directly connecting to one of the brokers.
+	 *
+	 * @param hostName Hostname of a known Kafka broker
+	 * @param port Port of the known Kafka broker
+	 * @param topic Name of the topic to listen to
+	 * @param partition Partition in the chosen topic
+	 */
+	public KafkaSinglePartitionIterator(String hostName, int port, String topic, int partition, KafkaOffset initialOffset) {
+		this.hosts = new ArrayList<String>();
+		hosts.add(hostName);
+		this.port = port;
+
+		this.topic = topic;
+		this.partition = partition;
+
+		this.initialOffset = initialOffset;
+
+		replicaBrokers = new ArrayList<String>();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Initializing a connection
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Initializes the connection by detecting the leading broker of
+	 * the topic and establishing a connection to it.
+	 */
+	public void initialize() throws InterruptedException {
+		PartitionMetadata metadata;
+		do {
+			metadata = findLeader(hosts, port, topic, partition);
+			try {
+				Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
+			} catch (InterruptedException e) {
+				throw new InterruptedException("Establishing connection to Kafka failed");
+			}
+		} while (metadata == null);
+
+		if (metadata.leader() == null) {
+			throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts.get(0)
+					+ ":" + port);
+		}
+
+		leadBroker = metadata.leader().host();
+		clientName = "Client_" + topic + "_" + partition;
+
+		consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
+
+		readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
+
+		resetFetchResponse(readOffset);
+	}
+
+	/**
+	 * Sets the partition to read from.
+	 *
+	 * @param partition
+	 * 		partition number
+	 */
+	public void setPartition(int partition) {
+		this.partition = partition;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Iterator methods
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Convenience method to emulate iterator behaviour.
+	 *
+	 * @return whether the iterator has a next element
+	 */
+	public boolean hasNext() {
+		return true;
+	}
+
+	/**
+	 * Returns the next message received from Kafka as a
+	 * byte array.
+	 *
+	 * @return next message as a byte array.
+	 */
+	public byte[] next() throws InterruptedException {
+		return nextWithOffset().getMessage();
+	}
+
+	public boolean fetchHasNext() throws InterruptedException {
+		synchronized (fetchResponse) {
+			if (!iter.hasNext()) {
+				resetFetchResponse(readOffset);
+				return iter.hasNext();
+			} else {
+				return true;
+			}
+		}
+	}
+
+	/**
+	 * Returns the next message and its offset received from
+	 * Kafka encapsulated in a POJO.
+	 *
+	 * @return next message and its offset.
+	 */
+	public MessageWithMetadata nextWithOffset() throws InterruptedException {
+
+		synchronized (fetchResponse) {
+			if (!iter.hasNext()) {
+				throw new RuntimeException(
+						"Trying to read when response is not fetched. Call fetchHasNext() first.");
+			}
+
+			MessageAndOffset messageAndOffset = iter.next();
+			long currentOffset = messageAndOffset.offset();
+
+			while (currentOffset < readOffset) {
+				messageAndOffset = iter.next();
+				currentOffset = messageAndOffset.offset();
+			}
+
+			readOffset = messageAndOffset.nextOffset();
+			ByteBuffer payload = messageAndOffset.message().payload();
+
+			byte[] bytes = new byte[payload.limit()];
+			payload.get(bytes);
+
+			return new MessageWithMetadata(messageAndOffset.offset(), bytes, partition);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Internal utilities
+	// --------------------------------------------------------------------------------------------
+
+	private void resetFetchResponse(long offset) throws InterruptedException {
+		FetchRequest req = new FetchRequestBuilder().clientId(clientName)
+				.addFetch(topic, partition, offset, 100000).build();
+
+		fetchResponse = consumer.fetch(req);
+
+		if (fetchResponse.hasError()) {
+			short code = fetchResponse.errorCode(topic, partition);
+
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
+			}
+
+			if (code == ErrorMapping.OffsetOutOfRangeCode()) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Asked for invalid offset {}, setting the offset to the latest.", offset);
+				}
+
+				readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName);
+			}
+			consumer.close();
+			consumer = null;
+			leadBroker = findNewLeader(leadBroker, topic, partition, port);
+		}
+
+		iter = fetchResponse.messageSet(topic, partition).iterator();
+	}
+
+	private PartitionMetadata findLeader(List<String> a_hosts, int a_port, String a_topic,
+			int a_partition) {
+		PartitionMetadata returnMetaData = null;
+		loop:
+		for (String seed : a_hosts) {
+			SimpleConsumer consumer = null;
+			try {
+				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
+				List<String> topics = Collections.singletonList(a_topic);
+				TopicMetadataRequest req = new TopicMetadataRequest(topics);
+				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+				List<TopicMetadata> metaData = resp.topicsMetadata();
+				for (TopicMetadata item : metaData) {
+					for (PartitionMetadata part : item.partitionsMetadata()) {
+						if (part.partitionId() == a_partition) {
+							returnMetaData = part;
+							break loop;
+						}
+					}
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Error communicating with Broker [" + seed
+						+ "] to find Leader for [" + a_topic + ", " + a_partition + "]", e);
+			} finally {
+				if (consumer != null) {
+					consumer.close();
+				}
+			}
+		}
+		if (returnMetaData != null) {
+			replicaBrokers.clear();
+			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
+				replicaBrokers.add(replica.host());
+			}
+		}
+		return returnMetaData;
+	}
+
+	private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws InterruptedException {
+		for (int i = 0; i < 3; i++) {
+			if (LOG.isInfoEnabled()) {
+				LOG.info("Trying to find a new leader after Broker failure.");
+			}
+			boolean goToSleep = false;
+			PartitionMetadata metadata = findLeader(replicaBrokers, a_port, a_topic, a_partition);
+			if (metadata == null) {
+				goToSleep = true;
+			} else if (metadata.leader() == null) {
+				goToSleep = true;
+			} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
+				// first time through if the leader hasn't changed give ZooKeeper a second to recover
+				// second time, assume the broker did recover before failover, or it was a non-Broker issue
+				//
+				goToSleep = true;
+			} else {
+				return metadata.leader().host();
+			}
+			if (goToSleep) {
+				try {
+					Thread.sleep(10000);
+				} catch (InterruptedException ie) {
+				}
+			}
+		}
+		throw new InterruptedException("Unable to find new leader after Broker failure.");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
index cb79556..c048ba1 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -25,7 +26,7 @@ import kafka.common.TopicAndPartition;
 import kafka.javaapi.OffsetResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 
-public abstract class KafkaOffset {
+public abstract class KafkaOffset implements Serializable {
 
 	public abstract long getOffset(SimpleConsumer consumer, String topic, int partition,
 			String clientName);

http://git-wip-us.apache.org/repos/asf/flink/blob/c0917f23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
new file mode 100644
index 0000000..02c49df
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
+
+/**
+ * Enum controlling the offset behavior of the PersistentKafkaSource.
+ */
+public enum Offset {
+	/**
+	 * Read the Kafka topology from the beginning
+	 */
+	FROM_BEGINNING,
+	/**
+	 * Read the topology from the current offset. (Default).
+	 */
+	FROM_CURRENT
+}
\ No newline at end of file